This work is supported by Anaconda Inc
Dask DataFrame works well with pandas’ new Extension Array interface, includingthird-party extension arrays. This lets Dask
Pandas 0.23 introduced the ExtensionArray, a way to store things otherthan a simple NumPy array in a DataFrame or Series. Internally pandas uses thisfor data types that aren’t handled natively by NumPy like datetimes withtimezones, Categorical, or (the new!) nullable integer arrays.
>>> s = pd.Series(pd.date_range('2000', periods=4, tz="US/Central"))
>>> s
0 2000-01-01 00:00:00-06:00
1 2000-01-02 00:00:00-06:00
2 2000-01-03 00:00:00-06:00
3 2000-01-04 00:00:00-06:00
dtype: datetime64[ns, US/Central]
dask.dataframe has always supported the extension types that pandas defines.
>>> import dask.dataframe as dd
>>> dd.from_pandas(s, npartitions=2)
Dask Series Structure:
npartitions=2
0 datetime64[ns, US/Central]
2 ...
3 ...
dtype: datetime64[ns, US/Central]
Dask Name: from_pandas, 2 tasks
Newer versions of pandas allow third-party libraries to write custom extensionarrays. These arrays can be placed inside a DataFrame or Series, and workjust as well as any extension array defined within pandas itself. However,third-party extension arrays provide a slight challenge for Dask.
Recall: dask.dataframe is lazy. We use a familiar pandas-like API to build upa task graph, rather than executing immediately. But if Dask DataFrame is lazy,then how do things like the following work?
>>> df = pd.DataFrame({"A": [1, 2], 'B': [3, 4]})
>>> ddf = dd.from_pandas(df, npartitions=2)
>>> ddf[['B']].columns
Index(['B'], dtype='object')
ddf[['B']] (lazily) selects the column 'B' from the dataframe. But accessing.columns immediately returns a pandas Index object with just the selectedcolumns.
No real computation has happened (you could just as easily swap out thefrom_pandas for a dd.read_parquet on a larger-than-memory dataset, and thebehavior would be the same). Dask is able to do these kinds of “metadata-only”computations, where the output depends only on the columns and the dtypes,without executing the task graph. Internally, Dask does this by keeping a pairof dummy pandas DataFrames on each Dask DataFrame.
>>> ddf._meta
Empty DataFrame
Columns: [A, B]
Index: []
>>> ddf._meta_nonempty
ddf._meta_nonempty
A B
0 1 1
1 1 1
We need the _meta_nonempty, since some operations in pandas behave differentlyon an Empty DataFrame than on a non-empty one (either by design or,occasionally, a bug in pandas).
The issue with third-party extension arrays is that Dask doesn’t know whatvalues to put in the _meta_nonempty. We’re quite happy to do it for each NumPydtype and each of pandas’ own extension dtypes. But any third-party librarycould create an ExtensionArray for any type, and Dask would have no way ofknowing what’s a valid value for it.
Rather than Dask guessing what values to use for the _meta_nonempty, extensionarray authors (or users) can register their extension dtype with Dask. Onceregistered, Dask will be able to generate the _meta_nonempty, and thingsshould work fine from there. For example, we can register the dummy DecimalArraythat pandas uses for testing (this isn’t part of pandas’ public API) with Dask.
from decimal import Decimal
from pandas.tests.extension.decimal import DecimalArray, DecimalDtype
# The actual registration that would be done in the 3rd-party library
from dask.dataframe.extensions import make_array_nonempty
@make_array_nonempty.register(DecimalDtype)
def _(dtype):
return DecimalArray._from_sequence([Decimal('0'), Decimal('NaN')],
dtype=dtype)
Now users of that extension type can place those arrays inside a Dask DataFrameor Series.
>>> df = pd.DataFrame({"A": DecimalArray([Decimal('1.0'), Decimal('2.0'),
... Decimal('3.0')])})
>>> ddf = dd.from_pandas(df, 2)
>>> ddf
Dask DataFrame Structure:
A
npartitions=1
0 decimal
2 ...
Dask Name: from_pandas, 1 tasks
>>> ddf.dtypes
A decimal
dtype: object
And from there, the usual operations just as they would in pandas.
>>> from random import choices
>>> df = pd.DataFrame({"A": DecimalArray(choices([Decimal('1.0'),
... Decimal('2.0')],
... k=100)),
... "B": np.random.choice([0, 1, 2, 3], size=(100,))})
>>> ddf = dd.from_pandas(df, 2)
In [35]: ddf.groupby("A").B.mean().compute()
Out[35]:
A
1.0 1.50
2.0 1.48
Name: B, dtype: float64
It’s neat that Dask now supports extension arrays. But to me, the exciting thingis just how little work this took. ThePR implementing support forthird-party extension arrays is quite short, just defining the object thatthird-parties register with, and using it to generate the data when dtype isdetected. Supporting the three new extension arrays in pandas 0.24.0(IntegerArray, PeriodArray, and IntervalArray), takes a handful of linesof code
@make_array_nonempty.register(pd.Interval):
def _(dtype):
return IntervalArray.from_breaks([0, 1, 2], closed=dtype.closed)
@make_array_nonempty.register(pd.Period):
def _(dtype):
return period_array([2000, 2001], freq=dtype.freq)
@make_array_nonempty.register(_IntegerDtype):
def _(dtype):
return integer_array([0, None], dtype=dtype)
Dask benefits directly from improvements made to pandas. Dask didn’t have tobuild out a new parallel extension array interface, and reimplement all the newextension arrays using the parallel interface. We just re-used what pandasalready did, and it fits into the existing Dask structure.
For third-party extension array authors, like cyberpandas, thework is similarly minimal. They don’t need to re-implement everything from theground up, just to play well with Dask.
This highlights the importance of one of the Dask project’s core values: workingwith the community. If you visit dask.org, you’ll seephrases like
Integrates with existing projects
and
Built with the broader community
At the start of Dask, the developers could have gone off and re-written pandasor NumPy from scratch to be parallel friendly (though we’d probably still beworking on that part today, since that’s such a massive undertaking). Instead,the Dask developers worked with the community, occasionally nudging it indirections that would help out dask. For example, many places in pandas heldthe GIL, preventingthread-based parallelism. Rather than abandoning pandas, the Dask and pandasdevelopers worked together to release the GIL where possible when it was abottleneck for dask.dataframe. This benefited Dask and anyone else trying todo thread-based parallelism with pandas DataFrames.
And now, when pandas introduces new features like nullable integers,dask.dataframe just needs to register it as an extension type and immediatelybenefits from it. And third-party extension array authors can do the same fortheir extension arrays.
If you’re writing an ExtensionArray, make sure to add it to the pandasecosystem page, and register it with Dask!