189
147

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

KaggleAdvent Calendar 2022

Day 6

1100万行・32GB超の巨大CSVファイルの基本統計量を4GBメモリマシンで算出する

Last updated at Posted at 2022-12-05

はじめに

この記事は,Kaggle Advent Calendar 2022第6日目の記事になります。

本記事では、 32GB超のCSVデータの基本統計量を、小規模マシンでも省メモリかつ高速に計算するテクニック について解説します。

Kaggleコンペに限らず、

  • マシンスペックが低いため、大きなデータセットを満足に処理できず困っている :cry:
  • 毎回行うファイル読み込みが遅いので、もっと高速化したい ⚡

といった悩みや課題を抱えている方の参考になれば幸いです。

モチベーション

データ分析業務やKaggle等のコンペティションで初めてのデータセットを扱う場合、いきなり機械学習アルゴリズムを行うことはまず無く、最初にデータ観察を行うのが一般的です。

テーブルデータであれば、各カラムの基本統計量(最小値、最大値、平均、分散、四分位数)などを計算・可視化し、データクレンジングの要否や特徴量設計の方針などを検討します。

ML ワークフローの各段階をサポートする Vertex AI の機能を示す図。

(GoogleCloudのVertex AI概要より引用。左上の「Data Readiness」がデータ観察に該当)

しかし、 データセットの規模がマシンスペック(特にメモリサイズ)を大きく上回る場合、データセットを読み込むだけでメモリエラーが発生し、次工程である特徴量設計や学習・チューニングに中々進めない ことが往々にしてあります。

クラウド環境が利用可能であればハイメモリタイプのインスタンスを使用することで解決しますが、例えば

  • データセットに個人情報が含まれているため、社内環境でしか分析できない(社外にデータを持ち出せない)
  • 参加したコンペがCode Competitionで、指定された環境でしか分析できない

といった場合、 利用可能な分析環境で何とかするしかない のが実情です。

クラウド環境が利用できる場合であっても、コンピューティングリソースは基本的に従量課金制のため、 同等の処理結果・処理時間をより小さいインスタンスタイプで実行できれば、費用の節約 にもつながります。

そこで、 低スペックマシンで大規模なデータセットの基本統計量を算出する方法 にニーズがあると考え、その方法の一つを記事化したものになります。

検証環境

データセット

今年2022年の秋に開催された、Kaggle AMEXコンペのテストデータ(test_data.csv)を使用します。

CSVファイルによるテーブルコンペで、 学習データは約16GB(553万行)、テストデータは約32GB(1,136万行) もあります。2021年に開催されたRiiidコンペはデータセットが1億行超もあることで有名でしたが、データサイズで比較すれば今回の方が6倍以上も大きいサイズになります。

$ du -ah *.csv 
32G     test_data.csv
16G     train_data.csv

マシンスペック

GoogleCloudのC2D-highcpu-2インスタンスを使用します。

マシンスペックは 2vCPU/4GBメモリであり、メモリサイズはKaggleのCPUカーネル(4vCPU/16GBメモリ)の1/4、データサイズと比べると約1/8しかありません。

image.png

https://cloud.google.com/compute/docs/compute-optimized-machines#c2d-high-cpu より抜粋)

使用パッケージ

本検証では、PandasDaskPyArrowを使用します。

全てpipでインストール可能です。

$ pip install pandas dask pyarrow

pythonと各パッケージのバージョンは以下の通りとなります。

$ python -V
Python 3.7.12

$ pip freeze | grep -E "pandas|dask|pyarrow"
dask==2022.2.0
pandas==1.3.5
pyarrow==10.0.0

実行時間とメモリ使用量の計測には、LinuxのGNU版timeパッケージを使用しました。

$ sudo apt install time

Pandasで実行(失敗)

まずはPandasで実行してみましょう。

pandasにはdescribe()という各カラムの基本統計量を算出できる便利なメソッドがありますので、これを使用します。

import pandas as pd

if __name__ == '__main__':
    data = pd.read_csv('train_data.csv')
    stat = data.describe()
    print(stat)

Pandasは全てのデータを一度に読み込んでメモリに乗せる仕様のため、このコードはメモリエラーで失敗してしまいます。

Pandasの公式ドキュメントでは、大規模なデータセットを扱う場合、

  • 必要な列のみを読み込む
  • 効率的なデータ型を使用する
  • データをチャンクで分割して処理する

ことが推奨されています。

しかし、 データを読み込む時点では、どれが必要な列なのか、どんなデータ型が適切なのかはまだ分かりません。チャンクで分割処理する方法も、 四分位数などの統計量は列データ全体を必要とする ため、これも単純には適用できません。

Daskで実行(成功!)

続いて、Daskを使用します。

DaskはPython用の並列・分散処理パッケージであり、大規模なデータに対してもスケーリング可能な機能を多数提供しています。Pandasの公式ドキュメントでも、エコシステムの1つとして紹介されています。

Daskでは以下のように実装します。
describe()の後にcompute()メソッドで遅延処理を実行している点を除けば、APIはPandasとほぼ同じです。

import dask.dataframe as dd

if __name__ == '__main__': 
    data = dd.read_csv('test_data.csv')
    stat = data.describe().compute()
    print(stat)

実行結果は以下となります。実行時間は約390秒、メモリ使用量は約1.3GBでした。

データサイズのわずか1/8程度しかメモリ搭載していないマシンでも、メモリエラーを起こさずに計算できました。素晴らしい!!!

                P_2          D_39           B_1           B_2           R_1           S_3          D_41  ...         D_139         D_140         D_141          D_142         D_143         D_144         D_145
count  5.485466e+06  5.531451e+06  5.531451e+06  5.529435e+06  5.531451e+06  4.510907e+06  5.529435e+06  ...  5.429903e+06  5.490819e+06  5.429903e+06  944408.000000  5.429903e+06  5.490724e+06  5.429903e+06
mean   6.563340e-01  1.531172e-01  1.240100e-01  6.214887e-01  7.880270e-02  2.258455e-01  5.978469e-02  ...  1.789305e-01  2.664348e-02  1.645212e-01       0.390799  1.788022e-01  5.238952e-02  6.233496e-02
std    2.446494e-01  2.700709e-01  2.119869e-01  4.014877e-01  2.263971e-01  1.933475e-01  2.025443e-01  ...  3.790614e-01  1.455480e-01  3.482771e-01       0.236182  3.789498e-01  1.825135e-01  1.934937e-01
min   -4.589548e-01  5.026190e-09 -7.588799e+00  9.192280e-09  1.534223e-09 -6.271320e-01  5.566545e-10  ...  3.767347e-10  3.725073e-09  1.650100e-10      -0.014539  5.549692e-09  2.500991e-09  1.226024e-09
25%    5.077288e-01  4.728840e-03  9.636755e-03  1.377500e-01  2.997485e-03  1.317796e-01  2.989860e-03  ...  3.169253e-03  2.648101e-03  3.170379e-03       0.247961  3.184138e-03  2.866789e-03  3.166187e-03
50%    7.148664e-01  9.370886e-03  3.701816e-02  8.151693e-01  5.950186e-03  1.676472e-01  5.934894e-03  ...  6.260630e-03  5.249856e-03  6.255896e-03       0.437398  6.260704e-03  5.654534e-03  6.291452e-03
75%    8.811929e-01  2.449055e-01  1.511139e-01  1.003019e+00  8.839325e-03  2.803336e-01  8.842138e-03  ...  9.344230e-03  7.773080e-03  9.360699e-03       0.602584  9.352387e-03  8.455284e-03  9.344943e-03
max    1.010000e+00  5.389619e+00  1.324060e+00  1.010000e+00  3.256284e+00  5.482888e+00  8.988807e+00  ...  1.010000e+00  1.010000e+00  1.339910e+00       2.229368  1.010000e+00  1.343331e+00  4.827630e+00

[8 rows x 186 columns]

なぜDaskでは省メモリで計算できたのでしょうか?
Daskのread_csv()にはblocksizeという引数があり、入力データをblocksize毎にパーティション分割して読み込みます(指定が無い場合、blocksizeはCPUコア数やメモリサイズから自動計算されます)。
各統計量はパーティション毎に並列で計算され、最後にデータ全体の統計量として再計算されるため、ピークメモリを抑えることに成功しています。

高速化する

先程のプログラムは成功しましたが、実行に6分半近く要しています。

これを更に高速化していきたいと思います。

Parquet形式に事前変換する

フラットファイルであるCSV形式は読み込みが遅いため、カラムナフォーマットであるParquet形式に事前変換します。

変換処理が1回だけ余計に発生しますが、2回目以降はカラムナフォーマットの方が読み込みが高速なため(後述)、 コンペのようにファイル読み込みを何度も行うユースケースでは事前変換がおススメ です。

変換処理にもDaskを使用します。

import dask.dataframe as dd

if __name__ == '__main__': 
    data = dd.read_csv('test_data.csv')
    data.to_parquet('test_parquet')

処理時間は約420秒、メモリ使用量は約1GBでした。

出力先のtest_parquetディレクトリには、元データが530個のParquetファイルに分割保存され、総サイズは約16GB(元データの半分)となりました。

分割数は、read_csv()で設定されたblocksizeに依存します。

デフォルトの最大値はblocksize=64MBであり、より大きな値を設定すれば分割数を減らすことができます(その分、パーティションあたりのデータサイズが増えるため、メモリ使用量は増加します)。

Parquet形式から読み込む

以降は、事前変換したParquetファイルから読み込みます。

Daskのread_parquet()にディレクトリを指定するだけで、出力したParquetファイル全量を一括で読み込むことが可能です。

import dask.dataframe as dd

if __name__ == '__main__': 
    data = dd.read_parquet('test_parquet')
    stat = data.describe().compute()
    print(stat)

処理時間は約210秒、メモリ使用量は約1.3GBでした。

元のCSVファイルから読み込むのと比較して、同等のメモリ使用量を維持しつつ、処理時間を4割以上削減できました!

(おまけ)describeメソッドの使い方

私は基本統計量の結果を、適切なデータ型の確認によく利用しています。

bit数の少ないデータ型に変換することで、 メモリ上のデータサイズが劇的に減るだけでなく、特徴量設計などの計算時間短縮 も期待できます。

一例として、以下のような変換があります。

  • 整数で最大値が255未満 ⇒ np.int8
  • 01しかない ⇒ bool
  • ユニークな数が少ない(カーディナリティが低い) ⇒ CategoricalDtype
  • 欠損値(NaN)以外は整数っぽい ⇒ 欠値補間 + 整数型

データに極端な外れ値が含まれている場合、最小値や最大値だけでは適切なデータ型は特定できないため、パーセンタイルを使用します。

describe()のパーセンタイルはデフォルトで[0.25, 0.75]ですが、quantiles引数で変更可能です。

percentiles=[0.01, 0.99]にすると、上下1%ずつを外れ値として除外できるため、残り98%のデータから比較的妥当なデータ範囲を絞り込めるようになります。

import dask.dataframe as dd

if __name__ == '__main__': 
    data = dd.read_parquet('test_parquet')
    stat = data.describe(percentiles=[0.01, 0.99]).compute()
    print(stat)
                P_2          D_39           B_1           B_2           R_1           S_3          D_41  ...         D_139         D_140         D_141         D_142         D_143         D_144         D_145
count  1.130388e+07  1.136376e+07  1.136376e+07  1.136084e+07  1.136376e+07  9.622797e+06  1.136084e+07  ...  1.126606e+07  1.134001e+07  1.126606e+07  1.966795e+06  1.126606e+07  1.133992e+07  1.126606e+07
mean   6.576889e-01  1.562754e-01  1.279497e-01  6.167632e-01  7.489555e-02  2.274024e-01  5.915285e-02  ...  1.796090e-01  2.769862e-02  1.655753e-01  4.068860e-01  1.794860e-01  5.198750e-02  6.377648e-02
std    2.468916e-01  2.657579e-01  2.126895e-01  4.059172e-01  2.160619e-01  1.988265e-01  2.006950e-01  ...  3.796441e-01  1.489702e-01  3.498050e-01  2.484312e-01  3.795397e-01  1.821541e-01  1.973693e-01
min   -4.658552e-01  6.204270e-09 -7.057417e+00  2.125048e-10  6.911055e-10 -6.855620e-01  4.726168e-10  ...  1.119644e-09  1.485373e-09  8.903923e-11 -1.471698e-02  8.253177e-11  1.051330e-09  1.150208e-09
1%     3.394018e-02  2.264711e-04  6.388284e-04  4.502037e-03  1.364020e-04  1.524112e-02  1.400369e-04  ...  1.558728e-04  1.245405e-04  1.465424e-04  2.643156e-02  1.466159e-04  1.349791e-04  1.542352e-04
50%    7.233169e-01  9.784972e-03  3.896192e-02  8.153240e-01  5.881068e-03  1.675709e-01  5.889806e-03  ...  6.291292e-03  5.217998e-03  6.297293e-03  4.524809e-01  6.313159e-03  5.708498e-03  6.338714e-03
99%    1.007214e+00  1.142791e+00  1.089292e+00  1.009767e+00  1.009128e+00  1.176328e+00  1.242532e+00  ...  1.009574e+00  1.007079e+00  1.030071e+00  1.286142e+00  1.009572e+00  1.147772e+00  1.369364e+00
max    1.010000e+00  9.330448e+00  1.324060e+00  1.010000e+00  3.258507e+00  4.341215e+00  1.211719e+01  ...  1.010000e+00  1.010000e+00  1.320065e+00  2.280940e+00  1.010000e+00  1.343333e+00  5.462223e+00

[8 rows x 186 columns]

まとめ

本記事では、32GB超のCSVデータの基本統計量を小規模マシンでも省メモリかつ高速に計算するテクニック について解説しました。

今回はDaskを使用しましたが、PythonであればPySparkVaexなど他の分散パッケージでも類似の効果が期待できます。

本記事が、私を含め、大きなデータセットの扱いに困っている方々の参考になれば幸いです。

189
147
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
189
147

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?