Dask
Contents
Dask¶
Dask is a Python library for parallel and distributed computing. Dask is:
-
Easy to use and set up (it’s just a Python library)
-
Powerful at providing scale, and unlocking complex algorithms
and Fun 🎉
How to Use Dask¶
Dask provides several APIs. Choose one that works best for you:
Dask Futures parallelize arbitrary for-loop style Python code, providing:
Flexible tooling allowing you to construct custom pipelines and workflows
Powerful scaling techniques, processing several thousand tasks per second
Responsive feedback allowing for intuitive execution, and helpful dashboards
Dask futures form the foundation for other Dask work
Learn more at Futures Documentation or see an example at Futures Example
from dask.distributed import LocalCluster
client = LocalCluster().get_client()
# Submit work to happen in parallel
results = []
for filename in filenames:
data = client.submit(load, filename)
result = client.submit(process, data)
results.append(result)
# Gather results back to local computer
results = client.gather(results)
Dask Dataframes parallelize the popular pandas library, providing:
Larger-than-memory execution for single machines, allowing you to process data that is larger than your available RAM
Parallel execution for faster processing
Distributed computation for terabyte-sized datasets
Dask Dataframes are similar in this regard to Apache Spark, but use the familiar pandas API and memory model. One Dask dataframe is simply a collection of pandas dataframes on different computers.
Learn more at DataFrame Documentation or see an example at DataFrame Example
import dask.dataframe as dd
# Read large datasets in parallel
df = dd.read_parquet("s3://mybucket/data.*.parquet")
df = df[df.value < 0]
result = df.groupby(df.name).amount.mean()
result = result.compute() # Compute to get pandas result
result.plot()
Dask Arrays parallelize the popular NumPy library, providing:
Larger-than-memory execution for single machines, allowing you to process data that is larger than your available RAM
Parallel execution for faster processing
Distributed computation for terabyte-sized datasets
Dask Arrays allow scientists and researchers to perform intuitive and sophisticated operations on large datasets but use the familiar NumPy API and memory model. One Dask array is simply a collection of NumPy arrays on different computers.
Learn more at Array Documentation or see an example at Array Example
import dask.array as da
x = da.random.random((10000, 10000))
y = (x + x.T) - x.mean(axis=1)
z = y.var(axis=0).compute()
Xarray wraps Dask array and is a popular downstream project, providing labeled axes and simultaneously tracking many Dask arrays together, resulting in more intuitive analyses. Xarray is popular and accounts for the majority of Dask array use today especially within geospatial and imaging communities.
Learn more at Xarray Documentation or see an example at Xarray Example
import xarray as xr
ds = xr.open_mfdataset("data/*.nc")
da.groupby('time.month').mean('time').compute()
Dask Bags are simple parallel Python lists, commonly used to process text or raw Python objects. They are …
Simple offering easy map and reduce functionality
Low-memory processing data in a streaming way that minimizes memory use
Good for preprocessing especially for text or JSON data prior ingestion into dataframes
Dask bags are similar in this regard to Spark RDDs or vanilla Python data structures and iterators. One Dask bag is simply a collection of Python iterators processing in parallel on different computers.
Learn more at Bag Documentation or see an example at Bag Example
import dask.bag as db
# Read large datasets in parallel
lines = db.read_text("s3://mybucket/data.*.json")
records = (lines
.map(json.loads)
.filter(lambda d: d["value"] > 0)
)
df = records.to_dask_dataframe()
How to Install Dask¶
Installing Dask is easy with pip
or conda
.
Learn more at Install Documentation
pip install "dask[complete]"
conda install dask
How to Deploy Dask¶
You can use Dask on a single machine, or deploy it on distributed hardware.
Learn more at Deploy Documentation
Dask can set itself up easily in your Python session if you create a
LocalCluster
object, which sets everything up for you.
from dask.distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client()
# Normal Dask work ...
Alternatively, you can skip this part, and Dask will operate within a thread pool contained entirely with your local process.
Coiled is a commercial SaaS product that deploys Dask clusters on cloud platforms like AWS, GCP, and Azure.
import coiled
cluster = coiled.Cluster(
n_workers=100,
region="us-east-2",
worker_memory="16 GiB",
spot_policy="spot_with_fallback",
)
client = cluster.get_client()
Learn more at Coiled Documentation
The Dask-Jobqueue project deploys Dask clusters on popular HPC job submission systems like SLURM, PBS, SGE, LSF, Torque, Condor, and others.
from dask_jobqueue import PBSCluster
cluster = PBSCluster(
cores=24,
memory="100GB",
queue="regular",
account="my-account",
)
cluster.scale(jobs=100)
client = cluster.get_client()
Learn more at Dask-Jobqueue Documentation
The Dask Kubernetes project provides a Dask Kubernetes Operator for deploying Dask on Kubernetes clusters.
from dask_kubernetes.operator import KubeCluster
cluster = KubeCluster(
name="my-dask-cluster",
image="ghcr.io/dask/dask:latest",
resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}},
)
cluster.scale(10)
client = cluster.get_client()
Learn more at Dask Kubernetes Documentation
Learn with Examples¶
Dask use is widespread, across all industries and scales. Dask is used anywhere Python is used and people experience pain due to large scale data, or intense computing.
You can learn more about Dask applications at the following sources:
Additionally, we encourage you to look through the reference documentation on this website related to the API that most closely matches your application.
Dask was designed to be easy to use and powerful. We hope that it’s able to help you have fun with your work.