import dask.array as da
# Create a large Dask array with chunks
x = da.random.random((10000, 10000), chunks=(1000, 1000))
# Compute the sum along an axis
result = x.mean(axis=0)
# Trigger computation
result = result.compute()Dask
Key Concepts and Components
- Dask Arrays
Dask arrays extend NumPy arrays to support larger-than-memory computations by splitting the array into many smaller chunks, each one a NumPy array.
- Dask DataFrames
Dask DataFrames extend pandas DataFrames for parallel and distributed computing. They are composed of many smaller pandas DataFrames.
- Dask Bags
Dask Bags are like parallel lists that provide map, filter, and groupby operations on potentially larger-than-memory datasets.
- Dask Delayed
Dask Delayed allows you to build task graphs in a low-level way, specifying operations and dependencies between them. It’s useful for custom workflows.
- Dask Distributed
Dask Distributed is a distributed computing framework that scales Dask workflows to a cluster of machines. It includes a task scheduler and workers.
Installation
pip install dask[complete] # Installs core Dask and extrasFor the distributed scheduler, you might need
pip install dask distributedDask Arrays
Dask arrays work similarly to NumPy arrays but can operate on data too large to fit into memory.
Dask DataFrames
Dask DataFrames are similar to pandas DataFrames but designed for parallel processing on larger datasets.
import dask.dataframe as dd
import pandas as pd
new = pd.DataFrame(x)/home/ben/miniconda3/envs/pfast/lib/python3.12/site-packages/dask/dataframe/__init__.py:42: FutureWarning:
Dask dataframe query planning is disabled because dask-expr is not installed.
You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.
warnings.warn(msg, FutureWarning)
dask_df = dd.from_pandas(new, npartitions=10)# Read a large CSV file into a Dask DataFrame
df = dask_df
# Perform some operations
result = df[df[0] > 0.5].mean()
# Compute the result
result = result.compute()
result0 0.752109
1 0.498086
2 0.500595
3 0.501720
4 0.501788
...
9995 0.503008
9996 0.497020
9997 0.498570
9998 0.494503
9999 0.496511
Length: 10000, dtype: float64
Dask Bags
Dask Bags are useful for working with unstructured or semi-structured data, such as text data
import dask.bag as db
# Create a Dask Bag from a list
data = db.from_sequence([1, 2, 3, 4, 5])
# Apply a function to each element
squares = data.map(lambda x: x ** 2)
# Compute the result
result = squares.compute()Dask Delayed
Dask Delayed allows for parallel execution by specifying the dependencies between tasks
from dask import delayed
@delayed
def add(x, y):
return x + y
@delayed
def sum_list(lst):
return sum(lst)
# Define a computation graph
x = add(1, 2)
y = add(3, 4)
total = sum_list([x, y])
totalDelayed('sum_list-bf70cda4-342a-46f4-8ebe-d863e28fea4e')
# Compute the result
result = total.compute()Dask Distributed
Dask Distributed allows scaling computations to multiple machines. Start a local cluster
from dask.distributed import Client
# Start a local cluster
client = Client()
# Check cluster status
print(client)
# Example with Dask DataFrame
import dask.dataframe as dd
# Read a CSV file and perform operations
df = dask_df
result = df.groupby(0).mean().compute()Monitoring and Debugging
Dask provides several tools for monitoring and debugging:
- Dask Dashboard: A web-based interface that shows the status of computations and the Dask cluster.
- Visualizing Task Graphs: Use dask.visualize to visualize task graphs and understand dependencies.
- Logs and Errors: Dask provides detailed logging to help identify and fix issues.