簡単な集約/変換処理を PySpark & pandas の DataFrame で行う
こちらの続き。
準備
サンプルデータは iris 。今回は HDFS に csv を置き、そこから読み取って DataFrame
を作成する。
# HDFS にディレクトリを作成しファイルを置く $ hadoop fs -mkdir /data/ $ hadoop fs -put iris.csv /data/ $ hadoop fs -ls / Found 1 items drwxr-xr-x - ec2-user supergroup 0 2015-04-28 20:01 /data # Spark のパスに移動 $ echo $SPARK_HOME /usr/local/spark $ cd $SPARK_HOME $ pwd /usr/local/spark $ bin/pyspark
補足 前回同様に pandas
から直接 PySpark
の DataFrame
を作成した場合、groupBy
時に java.lang.OutOfMemoryError: Java heap space
エラーが発生してシェルごと落ちる。
CSV ファイルの読み込み
pandas
では前回同様 read_csv
。
import numpy as np import pandas as pd # 表示する行数を設定 pd.options.display.max_rows=10 names = ['SepalLength', 'SepalWidth', 'PetalLength', 'PetalWidth', 'Species'] # pandas pdf = pd.read_csv('~/iris.csv', header=None, names=names) pdf # 略
PySpark
は標準では csv から直接 DataFrame
を作成できないため、一度 Row
のリストを作成して DataFrame
に変換する。
from pyspark.sql import Row lines = sc.textFile("hdfs://127.0.0.1:9000/data/iris.csv") cells = lines.map(lambda l: l.split(",")) rows = cells.map(lambda x: Row(SepalLength=float(x[0]), SepalWidth=float(x[1]), PetalLength=float(x[2]), PetalWidth=float(x[3]), Species=x[4])) sdf = sqlContext.createDataFrame(rows) sdf.show() # 略
グルーピング/集約
ある列の値ごとに集計
pandas
, PySpark
で多少 文法は異なる。
列の値でグループ分けし、一列の合計を取得する場合:
# pandas pdf.groupby('Species')['SepalLength'].sum() # Species # setosa 250.3 # versicolor 296.8 # virginica 329.4 # Name: SepalLength, dtype: float64 # PySpark sdf.groupBy('Species').sum('SepalLength').show() # Species SUM(SepalLength) # virginica 329.3999999999999 # versicolor 296.8 # setosa 250.29999999999998
指定した複数列の合計を取得する場合:
# pandas pdf.groupby('Species')[['PetalWidth', 'PetalLength']].sum() # PetalWidth PetalLength # Species # setosa 12.2 73.2 # versicolor 66.3 213.0 # virginica 101.3 277.6 # PySpark sdf.groupBy('Species').sum('PetalWidth', 'PetalLength').show() # Species SUM(PetalWidth) SUM(PetalLength) # virginica 101.29999999999998 277.59999999999997 # versicolor 66.30000000000001 213.0 # setosa 12.199999999999996 73.2
全列の合計を取得する場合:
# pandas pdf.groupby('Species').sum() # SepalLength SepalWidth PetalLength PetalWidth # Species # setosa 250.3 170.9 73.2 12.2 # versicolor 296.8 138.5 213.0 66.3 # virginica 329.4 148.7 277.6 101.3 # PySpark sdf.groupBy('Species').sum().show() # Species SUM(PetalLength) SUM(PetalWidth) SUM(SepalLength) SUM(SepalWidth) # virginica 277.59999999999997 101.29999999999998 329.3999999999999 148.7 # versicolor 213.0 66.30000000000001 296.8 138.5 # setosa 73.2 12.199999999999996 250.29999999999998 170.90000000000003
補足 pandas
では グループ化したデータも DataFrame
と同じようにスライシングできたりする。
一方、PySpark
の GroupedData
は集約系のAPI しか持っていない。
# pandas pdf.groupby('Species')['PetalWidth'] # <pandas.core.groupby.SeriesGroupBy object at 0x7f62f4218d50> # PySpark (NG!) sdf.groupBy('Species')[['Species']] # TypeError: 'GroupedData' object has no attribute '__getitem__' sdf.groupBy('Species').select('PetalWidth') # AttributeError: 'GroupedData' object has no attribute 'select'
また、pandas
では apply
で自作の集約関数 (UDAF) を利用することができるが、PySpark
1.3.1 時点 では非対応らしい。PySpark
の udf
を利用して定義した自作関数を集約時に使うと以下のエラーになる。
# pandas pdf.groupby('Species')[['PetalWidth', 'PetalLength']].apply(np.sum) # PetalWidth PetalLength # Species # setosa 12.2 73.2 # versicolor 66.3 213.0 # virginica 101.3 277.6 # PySpark (NG!) import pyspark.sql.functions np_sum = pyspark.sql.functions.udf(np.sum, pyspark.sql.types.FloatType()) sdf.groupBy('Species').agg(np_sum(sdf.PetalWidth)) # py4j.protocol.Py4JJavaError: An error occurred while calling o334.agg. # : org.apache.spark.sql.AnalysisException: expression 'pythonUDF' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.;
行持ち / 列持ち変換
複数列持ちの値を行持ちに展開 (unpivot / melt)
pandas
では pd.melt
。 DataFrame.melt
ではないので注意。
# pandas pmelted = pd.melt(pdf, id_vars=['Species'], var_name='variable', value_name='value') pmelted # Species variable value # 0 setosa SepalLength 5.1 # 1 setosa SepalLength 4.9 # 2 setosa SepalLength 4.7 # 3 setosa SepalLength 4.6 # 4 setosa SepalLength 5.0 # .. ... ... ... # 595 virginica PetalWidth 2.3 # 596 virginica PetalWidth 1.9 # 597 virginica PetalWidth 2.0 # 598 virginica PetalWidth 2.3 # 599 virginica PetalWidth 1.8 # # [600 rows x 3 columns]
同様の処理を PySpark
でやるには、DataFrame.flatMap
。1行の入力に対して複数行 (この例では4行) のデータを返すことができる。fratMap
の返り値は RDD
インスタンスになるため、必要なら再度 DataFrame
化する。
# PySpark def mapper(row): return [Row(Species=row[4], variable='PetalLength', value=row[0]), Row(Species=row[4], variable='PetalWidth', value=row[1]), Row(Species=row[4], variable='SepalLength', value=row[2]), Row(Species=row[4], variable='SepalWidth', value=row[3])] smelted = sqlContext.createDataFrame(sdf.flatMap(mapper)) smelted.show() # Species value variable # setosa 1.4 PetalLength # setosa 0.2 PetalWidth # setosa 5.1 SepalLength # setosa 3.5 SepalWidth # ... .. ... # setosa 1.4 PetalLength # setosa 0.2 PetalWidth # setosa 5.0 SepalLength # setosa 3.6 SepalWidth smelted.count() # 600L
複数行持ちの値を列持ちに変換 (pivot)
pandas
では DataFrame.pivot
。pivotするデータは列にする値 (以下では Species ) と行にする値 (以下では variable ) の組がユニークになっている必要がある。そのため、まず pivot 用データを作成 -> その後 pivot する。
# pandas # pivot 用データを作成 punpivot = pmelted.groupby(['Species', 'variable']).sum() punpivot = punpivot.reset_index() punpivot # Species variable value # 0 setosa PetalLength 73.2 # 1 setosa PetalWidth 12.2 # 2 setosa SepalLength 250.3 # 3 setosa SepalWidth 170.9 # 4 versicolor PetalLength 213.0 # .. ... ... ... # 7 versicolor SepalWidth 138.5 # 8 virginica PetalLength 277.6 # 9 virginica PetalWidth 101.3 # 10 virginica SepalLength 329.4 # 11 virginica SepalWidth 148.7 # # [12 rows x 3 columns] # pivot punpivot.pivot(index='variable', columns='Species', values='value') # Species setosa versicolor virginica # variable # PetalLength 73.2 213.0 277.6 # PetalWidth 12.2 66.3 101.3 # SepalLength 250.3 296.8 329.4 # SepalWidth 170.9 138.5 148.7
PySpark
の DataFrame
のままでは同じ処理はできないようなので、一度 RDD
に変換してから、 groupBy
-> map
# PySpark # pivot 用データを作成 sunpivot = smelted.groupBy('Species', 'variable').sum() sunpivot.show() # Species variable SUM(value) # versicolor SepalWidth 138.5 # versicolor SepalLength 296.8 # setosa PetalLength 73.2 # virginica PetalWidth 101.29999999999998 # versicolor PetalWidth 66.30000000000001 # setosa SepalWidth 170.90000000000003 # virginica PetalLength 277.59999999999997 # setosa SepalLength 250.29999999999998 # versicolor PetalLength 213.0 # setosa PetalWidth 12.199999999999996 # virginica SepalWidth 148.7 # virginica SepalLength 329.3999999999999 def reducer(obj): # variable : value の辞書を作成 result = {o[1]:o[2] for o in obj[1]} return Row(Species=obj[0], **result) # pivot spivot = sunpivot.rdd.groupBy(lambda x: x[0]).map(reducer) spivot.collect() # [Row(PetalLength=277.59999999999997, PetalWidth=101.29999999999998, SepalLength=329.3999999999999, SepalWidth=148.7, Species=u'virginica'), # Row(PetalLength=73.2, PetalWidth=12.199999999999996, SepalLength=250.29999999999998, SepalWidth=170.90000000000003, Species=u'setosa'), # Row(PetalLength=213.0, PetalWidth=66.30000000000001, SepalLength=296.8, SepalWidth=138.5, Species=u'versicolor')] sqlContext.createDataFrame(spivot).show() # PetalLength PetalWidth SepalLength SepalWidth Species # 277.59999999999997 101.29999999999998 329.3999999999999 148.7 virginica # 73.2 12.199999999999996 250.29999999999998 170.90000000000003 setosa # 213.0 66.30000000000001 296.8 138.5 versicolor
列の分割 / 結合
列の値を複数列に分割
ある列の値を適当に文字列処理して、新しい列を作成したい。pandas
には 文字列処理用のアクセサがあるため、 assign
と組み合わせて以下のように書ける。
# pandas psplitted = pmelted.assign(Parts=pmelted['variable'].str.slice(0, 5), Scale=pmelted['variable'].str.slice(5)) psplitted # Species variable value Parts Scale # 0 setosa SepalLength 5.1 Sepal Length # 1 setosa SepalLength 4.9 Sepal Length # 2 setosa SepalLength 4.7 Sepal Length # 3 setosa SepalLength 4.6 Sepal Length # 4 setosa SepalLength 5.0 Sepal Length # .. ... ... ... ... ... # 595 virginica PetalWidth 2.3 Petal Width # 596 virginica PetalWidth 1.9 Petal Width # 597 virginica PetalWidth 2.0 Petal Width # 598 virginica PetalWidth 2.3 Petal Width # 599 virginica PetalWidth 1.8 Petal Width # # [600 rows x 5 columns]
PySpark
には上記のようなメソッドはないので map
で処理する。
# PySpark def splitter(row): parts = row[2][:5] scale = row[2][5:] return Row(Species=row[0], value=row[1], Parts=parts, Scale=scale) ssplitted = sqlContext.createDataFrame(smelted.map(splitter)) ssplitted.show() # Parts Scale Species value # Petal Length setosa 1.4 # Petal Width setosa 0.2 # Sepal Length setosa 5.1 # Sepal Width setosa 3.5 # Petal Length setosa 1.4 # .. .. ... .. # Petal Length setosa 1.4 # Petal Width setosa 0.2 # Sepal Length setosa 5.0 # Sepal Width setosa 3.6
複数列の値を一列に結合
pandas
では普通に文字列結合すればよい。
# pandas psplitted['variable2'] = psplitted['Parts'] + psplitted['Scale'] psplitted # Species variable value Parts Scale variable2 # 0 setosa SepalLength 5.1 Sepal Length SepalLength # 1 setosa SepalLength 4.9 Sepal Length SepalLength # 2 setosa SepalLength 4.7 Sepal Length SepalLength # 3 setosa SepalLength 4.6 Sepal Length SepalLength # 4 setosa SepalLength 5.0 Sepal Length SepalLength # .. ... ... ... ... ... ... # 595 virginica PetalWidth 2.3 Petal Width PetalWidth # 596 virginica PetalWidth 1.9 Petal Width PetalWidth # 597 virginica PetalWidth 2.0 Petal Width PetalWidth # 598 virginica PetalWidth 2.3 Petal Width PetalWidth # 599 virginica PetalWidth 1.8 Petal Width PetalWidth # # [600 rows x 6 columns]
PySpark
では map
。
# PySpark def unite(row): return Row(Species=row[2], value=row[3], variable=row[0] + row[1]) sqlContext.createDataFrame(splitted.map(unite)).show() # Species value variable # setosa 1.4 PetalLength # setosa 0.2 PetalWidth # setosa 5.1 SepalLength # setosa 3.5 SepalWidth # .. .. .. # setosa 1.4 PetalLength # setosa 0.2 PetalWidth # setosa 5.0 SepalLength # setosa 3.6 SepalWidth
補足 withColumn
の場合、オペレータは 数値の演算として扱われてしまうようなのでここでは使えない。
# PySpark (NG!) ssplitted.withColumn('variable', splitted.Parts + splitted.Scale).show() # Parts Scale Species value variable # Petal Length setosa 1.4 null # Petal Width setosa 0.2 null # .. .. .. .. ..
まとめ
PySpark
と pandas
のデータ集約/変形処理を整理した。
データ分析用途で利用したい場合、(ごく当たり前だが) データ量が少なく手元でさっといろいろ試したい場合は pandas
、データ量が比較的多く 単純な処理を全体にかけたい場合は Spark
がよい。
Spark
は map 系の API が充実するとさらに使いやすくなりそうだ。が、小回りの効く文法/機能が充実していくことは考えにくいので 完全に Spark
だけでデータ分析をする、、という状態には将来もならないのではないかと思う。小さいデータは pandas
使いましょう。
Learning Spark: Lightning-Fast Big Data Analysis
- 作者: Holden Karau,Andy Konwinski,Patrick Wendell,Matei Zaharia
- 出版社/メーカー: O'Reilly Media
- 発売日: 2015/01/28
- メディア: Kindle版
- この商品を含むブログを見る