今日はPython (Pandas)で高速にCSVを読むことに挑戦したいと思います。
Kaggleに参加するたびに、イライラしていたので各実装の白黒はっきりさせようと思います。
R使いが羨ましいなぁと思う第一位がCSV読込が簡単に並列出来て速いことなので、
なんとかGILのあるPythonでも高速に読み込みたいと思います。
ただ、この検証ではコーディング量が多いものは検証しません。
CSV読込は頻出するので、フットワークの軽さが重要です。(オレオレライブラリ嫌い)
Pickleは早いけど。。。
Kaggleでは、大規模なCSVを高速に読みたいということが頻発します。
シリアライズだけなら、Pickleでのread/writeが爆速なのですが、
- バイナリなので中身が確認できない
- 一度メモリに全展開が必要
と欠点もあるため、なるべくCSVで管理したいところです。
見てみるとこんな感じ。
Line # Mem usage Increment Line Contents ================================================ 8 88.887 MiB 0.000 MiB @profile 9 def main(): 10 4116.051 MiB 4027.164 MiB df = pd.read_csv('train_data.csv') # .values 11 4116.051 MiB 0.000 MiB with open('aaa', 'wb') as f: 12 4116.051 MiB 0.000 MiB pickle.dump(df, f, -1) 13 4116.051 MiB 0.000 MiB with open('aaa', 'rb') as f: 14 4116.051 MiB 0.000 MiB df = pickle.load(f)
pickleはもっとメモリ喰うと思っていたので、GNU版のtimeで調べると
$ /usr/bin/time -f "%M KB" python test.py 27827088 KB
約28GB… いやいやメモリ使いすぎでしょう。。。
個人的な感覚はオブジェクトサイズの2倍程度だと思ってたので驚きです。こいつはどうにかしないといけません。
皆様、memory_profilerは関数内部で消費されたメモリは表示されませんので気をつけて!
(実は最後の最後で気づいて全実験をやり直した。マジでmemory_profilerさん勘弁して下さい。)
結論はDask使おう!
今回の計測結果がこちらです。
読み込み後のオブジェクトが約2GBぐらいなのでそれを参考に見て下さい。
カッコが付いているところは計測したけど子プロセス分入ってるか自信がない箇所です。
実装 | 実行時間 | memory_profile最大使用メモリ | GNU版timeの最大使用メモリ |
---|---|---|---|
pandas.read_csv() | 39.2s | 4.0GB | 6.3GB |
pandas.read_csv() (dtype指定) | 37.2s | 2.0GB | 3.2GB |
pandas.read_csv() (gzip圧縮) | 48.5s | 4.0GB | 6.3GB |
numpy.genfromtxt() | 4min 41s | timeout | 22.8 GB |
pandas.read_csv() (chunksize指定 + multiprocessing) | 43.6s | 計測不可 | (4.6GB) |
pandas.read_csv() (chunksize指定) | 40.6s | 2.5GB | 4.4GB |
pandas.read_csv() (chunksize指定+GC) | 40.8s | 2.5GB | 4.4GB |
dask.dataframe.read_csv() (dask.multiprocessing.get) | 16.3s | 計測不可 | (4.2GB) |
ファイル分割(10ファイル・並列なし) | 49.1s | 2.3GB | 4.4GB |
ファイル分割(10ファイル・並列) | 8.89s | 計測不可 | (4.2GB) |
pickle | 2.18s | 2.1GB | 28GB |
計測した結果から言うと、daskを使うのが速くて実装が楽です! 、デフォルトread_csvはかなりメモリを使用します!
ファイル分割が一番効くのはそうなんですが、↑の結果は行での分割なのでKaggleとかの特徴量で管理したいときには微妙なんですよね。
daskは複数ファイル読込も対応しているので、2行で書けるdaskの記法に統一してしまっても良さげです。
16コアで2倍程度の高速化というのはちょっと物足りない気もしますが、1ファイルなのでしょうがないんですかね
列で分割した場合の結果も後述しますが、pd.mergeではcopy=Falseを指定した方が良いです。
検証環境
当初はBash on Windows環境でやってたのですが、こちらはマルチスレッドでの挙動が怪しくてボツにしました。
データ
- 1,000,000行, 263列
- 2.6GB (GZIP圧縮後0.54GB)
速度検証
pandas.read_csv()
まずは王道のチェック
In [2]: %time tmp = pd.read_csv('train_data.csv') CPU times: user 37.1 s, sys: 2.06 s, total: 39.2 s Wall time: 39.2 s
この40秒がベンチマーク
pandas.read_csv() (dtype指定)
dtypeを指定してみた。
In [4]: %time tmp = pd.read_csv('train_data.csv', dtype=np.float32) CPU times: user 36.7 s, sys: 564 ms, total: 37.2 s Wall time: 37.2 s
若干速くなった。
pandas.read_csv() (gzip圧縮)
SSDのIO性能がボトルネックの可能性があるのでgzip圧縮もテスト
In [2]: %time tmp = pd.read_csv('train_data.csv.gz') CPU times: user 47.5 s, sys: 984 ms, total: 48.5 s Wall time: 48.5 s
gzipは速くはならないみたい
numpy.genfromtxt()
numpy.loadtxtは欠損値があると読み込めないので、こちらを検証
In [3]: %time tmp = np.genfromtxt('train_data.csv', delimiter=',', skip_header=1) CPU times: user 4min 33s, sys: 7.98 s, total: 4min 41s Wall time: 4min 41s
4分ぐらいかかっているので、pandasの実装の方が良いみたいですね。
pandas.read_csv() (chunksize指定 + multiprocessing)
実装
import numpy as np import pandas as pd from multiprocessing import Pool def get_df(df): return df p = Pool() tmp = pd.concat(list(p.map(get_df, pd.read_csv('train_data.csv', chunksize=100000))), ignore_index=True) p.close() p.join()
結果
CPU times: user 39.4 s, sys: 3.9 s, total: 43.3 s Wall time: 43.6 s
CPUもちゃんと全部動いておらず、この実装は駄目みたいですね。
やるならファイルを分割して並列読込が良さそう。
pandas.read_csv() (chunksize指定)
メモリが少ないときは使う実装なので一応確認
実装
import numpy as np import pandas as pd from multiprocessing import Pool df = None for tmp in pd.read_csv('train_data.csv', chunksize=100000): if df is None: df = tmp else: df = df.append(tmp, ignore_index=True)
結果
In [1]: %time %run hoge.py CPU times: user 31.7 s, sys: 8.94 s, total: 40.6 s Wall time: 40.6 s
さほど時間が変わらず読み込めています。
pandas.read_csv() (chunksize指定 + GC)
さらにメモリに気を使ってGC入れると
import gc import numpy as np import pandas as pd from multiprocessing import Pool df = None for tmp in pd.read_csv('train_data.csv', chunksize=100000): if df is None: df = tmp else: df = df.append(tmp, ignore_index=True) del tmp gc.collect()
In [1]: %time %run test.py CPU times: user 37.5 s, sys: 3.32 s, total: 40.8 s Wall time: 40.8 s
ほぼ変わらない時間で実行出来ています。誤差レベルなのでメモリ無い時はGC入れときましょう
dask.dataframe.read_csv()
並列といえばdask! ということで試します。
import dask.dataframe as ddf import dask.multiprocessing df = ddf.read_csv('train_data.csv') df = df.compute(get=dask.multiprocessing.get)
In [1]: %time %run test.py CPU times: user 7.16 s, sys: 7.49 s, total: 14.6 s Wall time: 16.3 s
CPUが全部動いて、倍ぐらいは速くなりました!
ただ16コアで倍なので、もうすこし贅沢を言いたい気もする
あとCHUNKSIZEを指定すると、上手く並列しなかったのでデフォルトで良さげです。
ファイル分割
100,000行ごとに10ファイルに分割して読込してみる
import glob import numpy as np import pandas as pd from multiprocessing import Pool def get_df(path): return pd.read_csv(path) p = Pool() tmp = pd.concat(p.map(get_df, glob.glob('tmp/*csv')), ignore_index=True) p.close() p.join()
In [1]: %time %run test.py CPU times: user 1.73 s, sys: 1.92 s, total: 3.66 s Wall time: 8.89 s
流石に速い。10倍ぐらい速くなってますね。
あと、本来ならコア数の倍数で分割するのが良さげです
メモリ使用量
ただ速ければいいというのは不公平なのでメモリも測ります。
特にpd.concatの瞬間に凄いメモリを使ってる可能性があります。
memory_profilerで見てみます
pandas.read_csv()
Line # Mem usage Increment Line Contents ================================================ 8 88.660 MiB 0.000 MiB @profile 9 def main(): 10 4115.828 MiB 4027.168 MiB tmp = pd.read_csv('train_data.csv')
普通にread_csvすると4GBほど使用
GNU版time計測だと、6.3GB
pandas.read_csv() (dtype指定)
Line # Mem usage Increment Line Contents ================================================ 8 88.680 MiB 0.000 MiB @profile 9 def main(): 10 2109.586 MiB 2020.906 MiB tmp = pd.read_csv('train_data.csv', dtype=np.float32)
64bit -> 32bitで2GBになり半分に減少
GNU版time計測だと3.2GB 同様ですね。
pandas.read_csv() (chunksize指定)
Line # Mem usage Increment Line Contents ================================================ 11 88.633 MiB 0.000 MiB @profile 12 def main(): 13 88.633 MiB 0.000 MiB df = None 14 2497.973 MiB 2409.340 MiB for tmp in pd.read_csv('train_data.csv', chunksize=100000): 15 2297.340 MiB -200.633 MiB if df is None: 16 492.543 MiB -1804.797 MiB df = tmp 17 else: 18 2497.992 MiB 2005.449 MiB df = df.append(tmp, ignore_index=True)
確かにchunkで読み込むとメモリ使用が減少してますね。
GNU版time計測だと4.4GB 一応素のread_csvよりは減っています。
pandas.read_csv() (chunksize指定 + GC)
Line # Mem usage Increment Line Contents ================================================ 11 89.121 MiB 0.000 MiB @profile 12 def main(): 13 89.121 MiB 0.000 MiB df = None 14 2300.328 MiB 2211.207 MiB for tmp in pd.read_csv('train_data.csv', chunksize=100000): 15 2300.328 MiB 0.000 MiB if df is None: 16 493.230 MiB -1807.098 MiB df = tmp 17 else: 18 2500.980 MiB 2007.750 MiB df = df.append(tmp, ignore_index=True) 19 2300.328 MiB -200.652 MiB del tmp 20 2300.328 MiB 0.000 MiB gc.collect()
GNU版time計測も4.4GBと同じ。
今回の例ではGCは余り効いてないでが、処理が長くて複雑な参照があると効くことも
ファイル分割
Line # Mem usage Increment Line Contents ================================================ 8 88.434 MiB 0.000 MiB @profile 9 def main(): 10 2326.051 MiB 2237.617 MiB tmp = pd.concat(map(get_df, glob.glob('tmp/*csv')), ignore_index=True)
concatなのにメモリ使用が少ない。。。
GNU版time計測だと4.4GB
mapがイテレータになってる効果があるかもなのでlistにキャストしてみます。
Line # Mem usage Increment Line Contents ================================================ 11 88.668 MiB 0.000 MiB @profile 12 def main(): 13 2325.824 MiB 2237.156 MiB tmp = list(map(get_df, glob.glob('tmp/*csv'))) 14 2325.797 MiB -0.027 MiB tmp = pd.concat(tmp, ignore_index=True)
GNU版time計測だと4.4GB
listにキャストしても変わらないのでイテレータで渡す意味はなさそうです。
read_csvよりも複数ファイル読込の方がメモリ使用が少ないのは意外でした。read_csvはかなりバッファを持ってメモリ確保しているっぽいですね。
複数ファイル読込は、速度に加えてメモリ側にも恩恵がありそうです。
気になったので、copy=Falseも見てみます。
Line # Mem usage Increment Line Contents ================================================ 8 88.543 MiB 0.000 MiB @profile 9 def main(): 10 2327.086 MiB 2238.543 MiB tmp = pd.concat(map(get_df, glob.glob('tmp/*csv')), ignore_index=True, copy=False)
GNU版time計測も4.4GB
copy=Falseってあんま意味ないんかい。。。
列分割でのメモリ使用量
上の例では行でファイル分割しましたが、Kaggleでは特徴の出し入れを頻繁に行うので、
列分割での管理の方が理想的です。
以前こんなツイートをしたので合わせて検証します。
numpy.concatinateよりもpandas.DataFrame.mergeでくっつけたほうが数倍メモリ使用が抑えられるという知見が得られた
— Takami Sato (@tkm2261) 2017年7月10日
ファイルを前100列と残り163列に分割して検証します。
まずはpandas.merge()
Line # Mem usage Increment Line Contents ================================================ 5 88.871 MiB 0.000 MiB @profile 6 def main(): 7 1613.852 MiB 1524.980 MiB df1 = pd.read_csv('train_data_first.csv') 8 3381.113 MiB 1767.262 MiB df2 = pd.read_csv('train_data_last.csv') 9 10 5387.531 MiB 2006.418 MiB df = pd.merge(df1, df2, left_index=True, right_index=True) # パターン1 11 5387.777 MiB 0.246 MiB df = df1.merge(df2, left_index=True, right_index=True)) # パターン2 12 3381.242 MiB -2006.535 MiB df = pd.merge(df1, df2, left_index=True, right_index=True, copy=False) # パターン3
GNU版time計測は
- パターン1: 5.5GB
- パターン2: 5.5GB
- パターン3: 4.7GB
pd.merge()とpd.DataFrame.merge()ではメモリ使用量に差はないようです。
mergeの場合はcopy=Falseが有効です。参照で問題ない場合は指定しましょう。
次に、numpy.concatenate
Line # Mem usage Increment Line Contents ================================================ 6 88.859 MiB 0.000 MiB @profile 7 def main(): 8 1613.848 MiB 1524.988 MiB df1 = pd.read_csv('train_data_first.csv').values 9 3381.113 MiB 1767.266 MiB df2 = pd.read_csv('train_data_last.csv').values 10 11 5387.574 MiB 2006.461 MiB df = np.concatenate([df1, df2], axis=1) # パターン1 12 5387.645 MiB 0.070 MiB df = np.hstack([df1, df2]) # パターン2 13 5387.645 MiB 0.000 MiB df = np.c_[df1, df2] # パターン3
GNU版time計測は全て同じでした。
- パターン1: 5.5GB
- パターン2: 5.5GB
- パターン3: 5.5GB
copy指定しない場合は、numpy, pandasともに同じメモリ使用量でした。
私のツイートの数倍というのの再現は出来ませんでしたが、copy=Falseが良く効く場合だったのかもしれません。