tkm2261's blog

研究員(OR屋) → データ分析官 → MLエンジニア → ニート → 米国CS PhDが諸々書いてます

PythonでCSVを高速&省メモリに読みたい

今日はPython (Pandas)で高速にCSVを読むことに挑戦したいと思います。

Kaggleに参加するたびに、イライラしていたので各実装の白黒はっきりさせようと思います。

R使いが羨ましいなぁと思う第一位がCSV読込が簡単に並列出来て速いことなので、

なんとかGILのあるPythonでも高速に読み込みたいと思います。

ただ、この検証ではコーディング量が多いものは検証しません。

CSV読込は頻出するので、フットワークの軽さが重要です。(オレオレライブラリ嫌い)

Pickleは早いけど。。。

Kaggleでは、大規模なCSVを高速に読みたいということが頻発します。

シリアライズだけなら、Pickleでのread/writeが爆速なのですが、

  • バイナリなので中身が確認できない
  • 一度メモリに全展開が必要

と欠点もあるため、なるべくCSVで管理したいところです。

見てみるとこんな感じ。

Line #    Mem usage    Increment   Line Contents
================================================
     8   88.887 MiB    0.000 MiB   @profile
     9                             def main():
    10 4116.051 MiB 4027.164 MiB       df = pd.read_csv('train_data.csv')  # .values
    11 4116.051 MiB    0.000 MiB       with open('aaa', 'wb') as f:
    12 4116.051 MiB    0.000 MiB           pickle.dump(df, f, -1)
    13 4116.051 MiB    0.000 MiB       with open('aaa', 'rb') as f:
    14 4116.051 MiB    0.000 MiB           df = pickle.load(f)

pickleはもっとメモリ喰うと思っていたので、GNU版のtimeで調べると

$ /usr/bin/time -f "%M KB" python test.py
27827088 KB

約28GB… いやいやメモリ使いすぎでしょう。。。

個人的な感覚はオブジェクトサイズの2倍程度だと思ってたので驚きです。こいつはどうにかしないといけません。

皆様、memory_profilerは関数内部で消費されたメモリは表示されませんので気をつけて!

(実は最後の最後で気づいて全実験をやり直した。マジでmemory_profilerさん勘弁して下さい。)

結論はDask使おう!

今回の計測結果がこちらです。
読み込み後のオブジェクトが約2GBぐらいなのでそれを参考に見て下さい。

カッコが付いているところは計測したけど子プロセス分入ってるか自信がない箇所です。

実装 実行時間 memory_profile最大使用メモリ GNU版timeの最大使用メモリ
pandas.read_csv() 39.2s 4.0GB 6.3GB
pandas.read_csv() (dtype指定) 37.2s 2.0GB 3.2GB
pandas.read_csv() (gzip圧縮) 48.5s 4.0GB 6.3GB
numpy.genfromtxt() 4min 41s timeout 22.8 GB
pandas.read_csv() (chunksize指定 + multiprocessing) 43.6s 計測不可 (4.6GB)
pandas.read_csv() (chunksize指定) 40.6s 2.5GB 4.4GB
pandas.read_csv() (chunksize指定+GC) 40.8s 2.5GB 4.4GB
dask.dataframe.read_csv() (dask.multiprocessing.get) 16.3s 計測不可 (4.2GB)
ファイル分割(10ファイル・並列なし) 49.1s 2.3GB 4.4GB
ファイル分割(10ファイル・並列) 8.89s 計測不可 (4.2GB)
pickle 2.18s 2.1GB 28GB

計測した結果から言うと、daskを使うのが速くて実装が楽です! 、デフォルトread_csvはかなりメモリを使用します!

ファイル分割が一番効くのはそうなんですが、↑の結果は行での分割なのでKaggleとかの特徴量で管理したいときには微妙なんですよね。
daskは複数ファイル読込も対応しているので、2行で書けるdaskの記法に統一してしまっても良さげです。

16コアで2倍程度の高速化というのはちょっと物足りない気もしますが、1ファイルなのでしょうがないんですかね

列で分割した場合の結果も後述しますが、pd.mergeではcopy=Falseを指定した方が良いです。

検証環境

  • Ubuntu 16.04
  • c4.4xlarge (16 vCPU)
  • SSDストレージ
  • pandas==0.20.3
  • dask==0.15.0

当初はBash on Windows環境でやってたのですが、こちらはマルチスレッドでの挙動が怪しくてボツにしました。

データ

  • 1,000,000行, 263列
  • 2.6GB (GZIP圧縮後0.54GB)

速度検証

pandas.read_csv()

まずは王道のチェック

In [2]: %time tmp = pd.read_csv('train_data.csv')
CPU times: user 37.1 s, sys: 2.06 s, total: 39.2 s
Wall time: 39.2 s

この40秒ベンチマーク

pandas.read_csv() (dtype指定)

dtypeを指定してみた。

In [4]: %time tmp = pd.read_csv('train_data.csv', dtype=np.float32)
CPU times: user 36.7 s, sys: 564 ms, total: 37.2 s
Wall time: 37.2 s

若干速くなった。

pandas.read_csv() (gzip圧縮)

SSDのIO性能がボトルネックの可能性があるのでgzip圧縮もテスト

In [2]: %time tmp = pd.read_csv('train_data.csv.gz')
CPU times: user 47.5 s, sys: 984 ms, total: 48.5 s
Wall time: 48.5 s

gzipは速くはならないみたい

numpy.genfromtxt()

numpy.loadtxtは欠損値があると読み込めないので、こちらを検証

In [3]: %time tmp = np.genfromtxt('train_data.csv', delimiter=',', skip_header=1)
CPU times: user 4min 33s, sys: 7.98 s, total: 4min 41s
Wall time: 4min 41s

4分ぐらいかかっているので、pandasの実装の方が良いみたいですね。

pandas.read_csv() (chunksize指定 + multiprocessing)

実装

import numpy as np
import pandas as pd
from multiprocessing import Pool

def get_df(df):
    return df

p = Pool()
tmp = pd.concat(list(p.map(get_df, pd.read_csv('train_data.csv', chunksize=100000))), ignore_index=True)
p.close()
p.join()

結果

CPU times: user 39.4 s, sys: 3.9 s, total: 43.3 s
Wall time: 43.6 s

CPUもちゃんと全部動いておらず、この実装は駄目みたいですね。

やるならファイルを分割して並列読込が良さそう。

pandas.read_csv() (chunksize指定)

メモリが少ないときは使う実装なので一応確認

実装

import numpy as np
import pandas as pd
from multiprocessing import Pool

df = None

for tmp in  pd.read_csv('train_data.csv', chunksize=100000):
    if df is None:
        df = tmp
    else:
        df = df.append(tmp, ignore_index=True)

結果

In [1]: %time %run hoge.py
CPU times: user 31.7 s, sys: 8.94 s, total: 40.6 s
Wall time: 40.6 s

さほど時間が変わらず読み込めています。

pandas.read_csv() (chunksize指定 + GC)

さらにメモリに気を使ってGC入れると

import gc
import numpy as np
import pandas as pd
from multiprocessing import Pool

df = None

for tmp in  pd.read_csv('train_data.csv', chunksize=100000):
    if df is None:
        df = tmp
    else:
        df = df.append(tmp, ignore_index=True)
    del tmp
    gc.collect()
In [1]: %time %run test.py
CPU times: user 37.5 s, sys: 3.32 s, total: 40.8 s
Wall time: 40.8 s

ほぼ変わらない時間で実行出来ています。誤差レベルなのでメモリ無い時はGC入れときましょう

dask.dataframe.read_csv()

並列といえばdask! ということで試します。

import dask.dataframe as ddf
import dask.multiprocessing

df = ddf.read_csv('train_data.csv')
df = df.compute(get=dask.multiprocessing.get)
In [1]: %time %run test.py
CPU times: user 7.16 s, sys: 7.49 s, total: 14.6 s
Wall time: 16.3 s

CPUが全部動いて、倍ぐらいは速くなりました!

ただ16コアで倍なので、もうすこし贅沢を言いたい気もする

あとCHUNKSIZEを指定すると、上手く並列しなかったのでデフォルトで良さげです。

ファイル分割

100,000行ごとに10ファイルに分割して読込してみる

import glob
import numpy as np
import pandas as pd
from multiprocessing import Pool

def get_df(path):
    return pd.read_csv(path)

p = Pool()
tmp = pd.concat(p.map(get_df, glob.glob('tmp/*csv')), ignore_index=True)
p.close()
p.join()
In [1]: %time %run test.py
CPU times: user 1.73 s, sys: 1.92 s, total: 3.66 s
Wall time: 8.89 s

流石に速い。10倍ぐらい速くなってますね。

あと、本来ならコア数の倍数で分割するのが良さげです

メモリ使用量

ただ速ければいいというのは不公平なのでメモリも測ります。

特にpd.concatの瞬間に凄いメモリを使ってる可能性があります。

memory_profilerで見てみます

pandas.read_csv()

Line #    Mem usage    Increment   Line Contents
================================================
     8   88.660 MiB    0.000 MiB   @profile
     9                             def main():
    10 4115.828 MiB 4027.168 MiB       tmp = pd.read_csv('train_data.csv')

普通にread_csvすると4GBほど使用

GNU版time計測だと、6.3GB

pandas.read_csv() (dtype指定)

Line #    Mem usage    Increment   Line Contents
================================================
     8   88.680 MiB    0.000 MiB   @profile
     9                             def main():
    10 2109.586 MiB 2020.906 MiB       tmp = pd.read_csv('train_data.csv', dtype=np.float32)

64bit -> 32bitで2GBになり半分に減少

GNU版time計測だと3.2GB 同様ですね。

pandas.read_csv() (chunksize指定)

Line #    Mem usage    Increment   Line Contents
================================================
    11   88.633 MiB    0.000 MiB   @profile
    12                             def main():
    13   88.633 MiB    0.000 MiB       df = None
    14 2497.973 MiB 2409.340 MiB       for tmp in pd.read_csv('train_data.csv', chunksize=100000):
    15 2297.340 MiB -200.633 MiB           if df is None:
    16  492.543 MiB -1804.797 MiB               df = tmp
    17                                     else:
    18 2497.992 MiB 2005.449 MiB               df = df.append(tmp, ignore_index=True)

確かにchunkで読み込むとメモリ使用が減少してますね。

GNU版time計測だと4.4GB 一応素のread_csvよりは減っています。

pandas.read_csv() (chunksize指定 + GC)

Line #    Mem usage    Increment   Line Contents
================================================
    11   89.121 MiB    0.000 MiB   @profile
    12                             def main():
    13   89.121 MiB    0.000 MiB       df = None
    14 2300.328 MiB 2211.207 MiB       for tmp in pd.read_csv('train_data.csv', chunksize=100000):
    15 2300.328 MiB    0.000 MiB           if df is None:
    16  493.230 MiB -1807.098 MiB               df = tmp
    17                                     else:
    18 2500.980 MiB 2007.750 MiB               df = df.append(tmp, ignore_index=True)
    19 2300.328 MiB -200.652 MiB           del tmp
    20 2300.328 MiB    0.000 MiB           gc.collect()

GNU版time計測も4.4GBと同じ。

今回の例ではGCは余り効いてないでが、処理が長くて複雑な参照があると効くことも

ファイル分割

Line #    Mem usage    Increment   Line Contents
================================================
     8   88.434 MiB    0.000 MiB   @profile
     9                             def main():
    10 2326.051 MiB 2237.617 MiB       tmp = pd.concat(map(get_df, glob.glob('tmp/*csv')), ignore_index=True)

concatなのにメモリ使用が少ない。。。

GNU版time計測だと4.4GB

mapがイテレータになってる効果があるかもなのでlistにキャストしてみます。

Line #    Mem usage    Increment   Line Contents
================================================
    11   88.668 MiB    0.000 MiB   @profile
    12                             def main():
    13 2325.824 MiB 2237.156 MiB       tmp = list(map(get_df, glob.glob('tmp/*csv')))
    14 2325.797 MiB   -0.027 MiB       tmp = pd.concat(tmp, ignore_index=True)

GNU版time計測だと4.4GB

listにキャストしても変わらないのでイテレータで渡す意味はなさそうです。

read_csvよりも複数ファイル読込の方がメモリ使用が少ないのは意外でした。read_csvはかなりバッファを持ってメモリ確保しているっぽいですね。

複数ファイル読込は、速度に加えてメモリ側にも恩恵がありそうです。

気になったので、copy=Falseも見てみます。

Line #    Mem usage    Increment   Line Contents
================================================
     8   88.543 MiB    0.000 MiB   @profile
     9                             def main():
    10 2327.086 MiB 2238.543 MiB       tmp = pd.concat(map(get_df, glob.glob('tmp/*csv')), ignore_index=True, copy=False)

GNU版time計測も4.4GB

copy=Falseってあんま意味ないんかい。。。

列分割でのメモリ使用量

上の例では行でファイル分割しましたが、Kaggleでは特徴の出し入れを頻繁に行うので、

列分割での管理の方が理想的です。

以前こんなツイートをしたので合わせて検証します。

ファイルを前100列と残り163列に分割して検証します。

まずはpandas.merge()

Line #    Mem usage    Increment   Line Contents
================================================
     5   88.871 MiB    0.000 MiB   @profile
     6                             def main():
     7 1613.852 MiB 1524.980 MiB       df1 = pd.read_csv('train_data_first.csv')
     8 3381.113 MiB 1767.262 MiB       df2 = pd.read_csv('train_data_last.csv')
     9
    10 5387.531 MiB 2006.418 MiB       df = pd.merge(df1, df2, left_index=True, right_index=True) # パターン1
    11 5387.777 MiB    0.246 MiB       df = df1.merge(df2, left_index=True, right_index=True)) # パターン2
    12 3381.242 MiB -2006.535 MiB      df = pd.merge(df1, df2, left_index=True, right_index=True, copy=False) # パターン3

GNU版time計測は

  • パターン1: 5.5GB
  • パターン2: 5.5GB
  • パターン3: 4.7GB

pd.merge()とpd.DataFrame.merge()ではメモリ使用量に差はないようです。

mergeの場合はcopy=Falseが有効です。参照で問題ない場合は指定しましょう。

次に、numpy.concatenate

Line #    Mem usage    Increment   Line Contents
================================================
     6   88.859 MiB    0.000 MiB   @profile
     7                             def main():
     8 1613.848 MiB 1524.988 MiB       df1 = pd.read_csv('train_data_first.csv').values
     9 3381.113 MiB 1767.266 MiB       df2 = pd.read_csv('train_data_last.csv').values
    10
    11 5387.574 MiB 2006.461 MiB       df = np.concatenate([df1, df2], axis=1) # パターン1
    12 5387.645 MiB    0.070 MiB       df = np.hstack([df1, df2]) # パターン2
    13 5387.645 MiB    0.000 MiB       df = np.c_[df1, df2] # パターン3

GNU版time計測は全て同じでした。

  • パターン1: 5.5GB
  • パターン2: 5.5GB
  • パターン3: 5.5GB

copy指定しない場合は、numpy, pandasともに同じメモリ使用量でした。

私のツイートの数倍というのの再現は出来ませんでしたが、copy=Falseが良く効く場合だったのかもしれません。