Python Dask で 並列 DataFrame 処理
はじめに
先日のエントリで少し記載した Dask
について、その使い方を書く。Dask
を使うと、NumPy
や pandas
の API を利用して並列計算/分散処理を行うことができる。また、Dask
は Out-Of-Core (データ量が多くメモリに乗らない場合) の処理も考慮した実装になっている。
上にも書いたが、Dask
は NumPy
や pandas
を置き換えるものではない。数値計算のためのバックエンドとして NumPy
や pandas
を利用するため、むしろこれらのパッケージが必須である。
Dask
は NumPy
や pandas
の API を完全にはサポートしていないため、並列 / Out-Of-Core 処理が必要な場面では Dask
を、他では NumPy
/ pandas
を使うのがよいと思う。pandas
とDask
のデータはそれぞれ簡単に相互変換できる。
補足 とはいえ都度の変換は手間なので、pandas
の処理実行時に Dask
を利用するオプションをつける という検討はされている。
インストール
pip
もしくは conda
で。
pip install dask
準備
まずは必要なパッケージを import
する。
import numpy as np import pandas as pd import dask.dataframe as dd np.__version__ # '1.9.3' pd.__version__ # '0.16.2' # バージョン表示のためにインポート。 import dask dask.__version__ # '0.7.1'
pandas
から Dask
への変換
サンプルデータは すでにメモリ上にある pd.DataFrame
とする。
df = pd.DataFrame({'X': np.arange(10), 'Y': np.arange(10, 20), 'Z': np.arange(20, 30)}, index=list('abcdefghij')) df # X Y Z # a 0 10 20 # b 1 11 21 # c 2 12 22 # d 3 13 23 # e 4 14 24 # f 5 15 25 # g 6 16 26 # h 7 17 27 # i 8 18 28 # j 9 19 29
pandas
のデータ構造から Dask
に変換するには dd.from_pandas
。2つめの引数で データをいくつのパーティションに分割するかを指定している。結果は dask.dataframe.DataFrame
( dd.DataFrame
) となる。
divisions
はデータがどこで分割されたかを示す。表示から、1つ目のパーティションには .index
が "a" 〜 "e" までのデータが、2つ目のには "f" 〜 "j" までのデータが含まれていることがわかる。
重要 dd.DataFrame
の処理全般について、行の順序は保証されない。各パーティションには divisions
で示される .index
を持つ行が含まれるが、パーティション内が常にソートされているとは限らない。
ddf = dd.from_pandas(df, 2) ddf # dd.DataFrame<from_pandas-b08addf72f0693a9fa1bb6c21d91f3d4, divisions=('a', 'f', 'j')> # DataFrame の列名 ddf.columns # ('X', 'Y', 'Z') # DataFrame の index ddf.index # dd.Index<from_pandas-b08addf72f0693a9fa1bb6c21d91f3d4-index, divisions=('a', 'f', 'j')> # DataFrame の divisions (パーティションの分割箇所) ddf.divisions # ('a', 'f', 'j') # DataFrame のパーティション数 ddf.npartitions # 2
dd.DataFrame
からデータを取得する (計算処理を実行する) には .compute()
。結果、元の pd.DataFrame
が得られる。
ddf.compute() # X Y Z # a 0 10 20 # b 1 11 21 # c 2 12 22 # d 3 13 23 # e 4 14 24 # f 5 15 25 # g 6 16 26 # h 7 17 27 # i 8 18 28 # j 9 19 29
内部処理
ここから、dd.DataFrame
でどういった処理ができるのか、内部動作とあわせて記載する。といっても難しいことは全くやっていない。
まず、データ全体について 1 加算する処理を考える。これは 各パーティションごとに 1 加算して連結するのと同じ。
ddf + 1 # dd.DataFrame<elemwise-5b9ae0407254158903343113fac41421, divisions=('a', 'f', 'j')> (ddf + 1).compute() # 略
次に、列ごとの合計をとる処理。これは、各パーティションごとに列の合計をとって連結し、もう一度 合計をとる処理と同じ。
列の合計をとるため、結果は Series
になる。ddf.sum()
の時点では .compute()
が呼ばれていないため実際の計算は行われていないが、Dask
が結果の型 や divisions
を推測し正しく表示している。
ddf.sum() # dd.Series<dataframe-sum--second-7ba12c9d58c17f61406b84b6c30d7a26, divisions=(None, None)> ddf.sum().compute() # X 45 # Y 145 # Z 245 # dtype: int64
Dask
ではこのような形で、計算処理をパーティションごとに並列 / Out-Of-Core 実行できる形に読み替えている。これらの処理は内部的には Computational Graph ( Dask Graph ) として表現され、.compute()
によって実行される。
各処理の Dask Graph は、.visualize()
メソッドを利用して確認できる。Graph 上で縦につながっていない処理同士は並列で実行できる。
ddf.sum().visualize()
各列の平均をとる場合、内部的には各列の .sum()
と 各列の .count()
をそれぞれ計算して除算。
ddf.mean().compute() # X 4.5 # Y 14.5 # Z 24.5 # dtype: float64 ddf.mean().visualize()
DataFrame
同士の演算や、演算をチェインすることもできる。互いのパーティションが異なる場合はそれらが一致するよう調整が行われる。
((ddf - (ddf * 2)) == - ddf).visualize()
また、累積関数 ( cumxxx
) や ウィンドウ関数 ( rolling_xxx
) なども利用できる。
ddf.cumsum().compute() # X Y Z # a 0 10 20 # b 1 21 41 # c 3 33 63 # d 6 46 86 # e 10 60 110 # f 15 75 135 # g 21 91 161 # h 28 108 188 # i 36 126 216 # j 45 145 245 ddf.cumsum().visualize()
concat
, join
などの 連結 / 結合もできる。通常の演算と同じく、dd.DataFrame
同士のパーティションは適当に調整される。
df2 = pd.DataFrame({'A': np.arange(5), 'B': np.arange(10, 15)}, index=list('adefg')) df2 # A B # a 0 10 # d 1 11 # e 2 12 # f 3 13 # g 4 14 ddf2 = dd.from_pandas(df2, 2) ddf2 # dd.DataFrame<from_pandas-667963fc37e22688843f02da80df5963, divisions=('a', 'f', 'g')> ddf.join(ddf2).compute() # X Y Z A B # a 0 10 20 0 10 # b 1 11 21 NaN NaN # c 2 12 22 NaN NaN # d 3 13 23 1 11 # e 4 14 24 2 12 # f 5 15 25 3 13 # g 6 16 26 4 14 # h 7 17 27 NaN NaN # i 8 18 28 NaN NaN # j 9 19 29 NaN NaN ddf.join(ddf2).visualize()
サポートされている処理の一覧は以下のAPIドキュメントを。一部利用できない引数が明記されていないが、次バージョンにて改訂。
9/26 追記 処理結果については、行の順序以外は pandas
の処理と一致するはず。例外は quantile
のような percentile をとる処理。これらは Out-Of-Core 処理のための近似アルゴリズムを使っており、正確な値とずれることがある。
実データでの利用例
こちらが良エントリ (英語)。
- Analyzing Reddit Comments with Dask and Castra
- Out-of-Core Dataframes in Python: Dask and OpenStreetMap
まとめ
Dask
を利用して DataFrame
を並列処理する方法を記載した。手順は、
dd.from_pandas
を利用してpd.DataFrame
をdd.DataFrame
へ変換。- 実行したいメソッド / 演算を
dd.DataFrame
に対して適用。 .compute()
で計算を実行し、結果を取得する。計算処理はDask
にて自動的に並列化される。
最後、pandas
0.16.2 時点では並列処理による速度向上は大きくはない。これは Python の GIL (Global Interpreter Lock ) により並列実行できる処理が限定されているため。今月中にリリース予定の pandas
0.17.0 では いくつかの処理で Cython から明示的に GIL 解放するよう実装を変更しており、並列化による速度向上は大きくなる。