import dask.array as da
# Create a large Dask array with chunks
= da.random.random((10000, 10000), chunks=(1000, 1000))
x
# Compute the sum along an axis
= x.mean(axis=0)
result
# Trigger computation
= result.compute() result
Dask
Key Concepts and Components
- Dask Arrays
Dask arrays extend NumPy arrays to support larger-than-memory computations by splitting the array into many smaller chunks, each one a NumPy array.
- Dask DataFrames
Dask DataFrames extend pandas DataFrames for parallel and distributed computing. They are composed of many smaller pandas DataFrames.
- Dask Bags
Dask Bags are like parallel lists that provide map, filter, and groupby operations on potentially larger-than-memory datasets.
- Dask Delayed
Dask Delayed allows you to build task graphs in a low-level way, specifying operations and dependencies between them. It’s useful for custom workflows.
- Dask Distributed
Dask Distributed is a distributed computing framework that scales Dask workflows to a cluster of machines. It includes a task scheduler and workers.
Installation
pip install dask[complete] # Installs core Dask and extras
For the distributed scheduler, you might need
pip install dask distributed
Dask Arrays
Dask arrays work similarly to NumPy arrays but can operate on data too large to fit into memory.
Dask DataFrames
Dask DataFrames are similar to pandas DataFrames but designed for parallel processing on larger datasets.
import dask.dataframe as dd
import pandas as pd
= pd.DataFrame(x) new
/home/ben/miniconda3/envs/pfast/lib/python3.12/site-packages/dask/dataframe/__init__.py:42: FutureWarning:
Dask dataframe query planning is disabled because dask-expr is not installed.
You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.
warnings.warn(msg, FutureWarning)
= dd.from_pandas(new, npartitions=10) dask_df
# Read a large CSV file into a Dask DataFrame
= dask_df
df
# Perform some operations
= df[df[0] > 0.5].mean()
result
# Compute the result
= result.compute()
result result
0 0.752109
1 0.498086
2 0.500595
3 0.501720
4 0.501788
...
9995 0.503008
9996 0.497020
9997 0.498570
9998 0.494503
9999 0.496511
Length: 10000, dtype: float64
Dask Bags
Dask Bags are useful for working with unstructured or semi-structured data, such as text data
import dask.bag as db
# Create a Dask Bag from a list
= db.from_sequence([1, 2, 3, 4, 5])
data
# Apply a function to each element
= data.map(lambda x: x ** 2)
squares
# Compute the result
= squares.compute() result
Dask Delayed
Dask Delayed allows for parallel execution by specifying the dependencies between tasks
from dask import delayed
@delayed
def add(x, y):
return x + y
@delayed
def sum_list(lst):
return sum(lst)
# Define a computation graph
= add(1, 2)
x = add(3, 4)
y = sum_list([x, y])
total total
Delayed('sum_list-bf70cda4-342a-46f4-8ebe-d863e28fea4e')
# Compute the result
= total.compute() result
Dask Distributed
Dask Distributed allows scaling computations to multiple machines. Start a local cluster
from dask.distributed import Client
# Start a local cluster
= Client()
client
# Check cluster status
print(client)
# Example with Dask DataFrame
import dask.dataframe as dd
# Read a CSV file and perform operations
= dask_df
df = df.groupby(0).mean().compute() result
Monitoring and Debugging
Dask provides several tools for monitoring and debugging:
- Dask Dashboard: A web-based interface that shows the status of computations and the Dask cluster.
- Visualizing Task Graphs: Use dask.visualize to visualize task graphs and understand dependencies.
- Logs and Errors: Dask provides detailed logging to help identify and fix issues.