始めに
先日、こちらのリリースが発表されました。
Athena can now write query results in Parquet, Avro, ORC and JSON formats
これにより、AthenaでもUNLOAD機能が使えるようになりました。
この記事では、このUNLOADを利用して、S3上に置いたCSVを、Parquet形式に変換しS3に出力するETLをAthenaを使って実装します。
実はCTASを使って同じようなことができます。2年前に以下の記事を書いていました。ただCTASは用途も違うためETL的には使いにくいものでした。 yomon.hatenablog.com
今回、PREPAREとUNLOADを一緒に使うことでパラメータ化した変換処理を実装すると、ある程度は柔軟な処理が実装できそうなのでやってみます。
CSVテーブル準備
利用するCSV形式のデータ
AWSから提供されているS3バケット上にある、Athena用のCSVサンプルデータを利用します。
s3://athena-examples-ap-northeast-1/flight/csv/
Athenaに読み込み
Glueからクローラなどを使うこともできますが、今回は全てAthenaから処理を実行します。
DB作成
Athenaから flight_sample_db
という名前のDBを作成します。既存のDBを使う場合は、この後の作業のDB名を読み替えて下さい。
CREATE DATABASE flight_sample_db
テーブル作成
flight_sample_db
にS3からCSVを読み込み、flight_delays_csv
テーブルを作成します。
CREATE EXTERNAL TABLE flight_sample_db.flight_delays_csv ( yr INT, quarter INT, month INT, dayofmonth INT, dayofweek INT, flightdate STRING, uniquecarrier STRING, airlineid INT, carrier STRING, tailnum STRING, flightnum STRING, originairportid INT, originairportseqid INT, origincitymarketid INT, origin STRING, origincityname STRING, originstate STRING, originstatefips STRING, originstatename STRING, originwac INT, destairportid INT, destairportseqid INT, destcitymarketid INT, dest STRING, destcityname STRING, deststate STRING, deststatefips STRING, deststatename STRING, destwac INT, crsdeptime STRING, deptime STRING, depdelay INT, depdelayminutes INT, depdel15 INT, departuredelaygroups INT, deptimeblk STRING, taxiout INT, wheelsoff STRING, wheelson STRING, taxiin INT, crsarrtime INT, arrtime STRING, arrdelay INT, arrdelayminutes INT, arrdel15 INT, arrivaldelaygroups INT, arrtimeblk STRING, cancelled INT, cancellationcode STRING, diverted INT, crselapsedtime INT, actualelapsedtime INT, airtime INT, flights INT, distance INT, distancegroup INT, carrierdelay INT, weatherdelay INT, nasdelay INT, securitydelay INT, lateaircraftdelay INT, firstdeptime STRING, totaladdgtime INT, longestaddgtime INT, divairportlandings INT, divreacheddest INT, divactualelapsedtime INT, divarrdelay INT, divdistance INT, div1airport STRING, div1airportid INT, div1airportseqid INT, div1wheelson STRING, div1totalgtime INT, div1longestgtime INT, div1wheelsoff STRING, div1tailnum STRING, div2airport STRING, div2airportid INT, div2airportseqid INT, div2wheelson STRING, div2totalgtime INT, div2longestgtime INT, div2wheelsoff STRING, div2tailnum STRING, div3airport STRING, div3airportid INT, div3airportseqid INT, div3wheelson STRING, div3totalgtime INT, div3longestgtime INT, div3wheelsoff STRING, div3tailnum STRING, div4airport STRING, div4airportid INT, div4airportseqid INT, div4wheelson STRING, div4totalgtime INT, div4longestgtime INT, div4wheelsoff STRING, div4tailnum STRING, div5airport STRING, div5airportid INT, div5airportseqid INT, div5wheelson STRING, div5totalgtime INT, div5longestgtime INT, div5wheelsoff STRING, div5tailnum STRING ) PARTITIONED BY (year STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n' LOCATION 's3://athena-examples-ap-northeast-1/flight/csv/';
Athenaの画面からもDBとテーブルが作成されたことが確認できます。
パーティション読み込み
aws s3 ls
でCSVデータのパスを確認するとわかるのですが、CSVは year
のカラムでパーティション分割されています。
$ aws s3 ls s3://athena-examples-ap-northeast-1/flight/csv/ PRE year=1987/ PRE year=1988/ PRE year=1989/ PRE year=1990/ PRE year=1991/ PRE year=1992/ PRE year=1993/ #省略
以下でAthenaから実行しパーティションを読み込みます。
MSCK REPAIR TABLE flight_sample_db.flight_delays_csv
パーティションが読み込まれました。
CSVをクエリしてみる
こちらのファイルをクエリしてみます。圧縮で14MB程度のファイルです。
$ aws s3 ls s3://athena-examples-ap-northeast-1/flight/csv/year=1987/ 2017-05-08 13:11:43 14510443 1987-cleaned.csv.bz2
year = '1987'
とするとパーティションが機能して、上記のファイルのみが取得できるはずです。
SELECT * FROM flight_sample_db.flight_delays_csv WHERE year = '1987'
結果は以下の通り。パーティションが効いてそうで、上記のAWS CLIで取得したファイルが読み込まれてそうです。
(実行時間: 3 分 5 秒, スキャンしたデータ: 13.84 MB)
PREPARE文による処理の準備
PREPARE文を利用することで、事前に用意したクエリにパラメータを引き渡し実行することが可能になります。 docs.aws.amazon.com
PREPAREによる処理作成
以下のように作成してみます。長いですが下に説明書きます。
PREPARE unload_flight_delays_csv_as_parquet FROM UNLOAD (SELECT yr, quarter, dayofweek, flightdate, uniquecarrier, airlineid, carrier, tailnum, flightnum, originairportid, originairportseqid, origincitymarketid, origin, origincityname, originstate, originstatefips, originstatename, originwac, destairportid, destairportseqid, destcitymarketid, dest, destcityname, deststate, deststatefips, deststatename, destwac, crsdeptime, deptime, depdelay, depdelayminutes, depdel15, departuredelaygroups, deptimeblk, taxiout, wheelsoff, wheelson, taxiin, crsarrtime, arrtime, arrdelay, arrdelayminutes, arrdel15, arrivaldelaygroups, arrtimeblk, cancelled, cancellationcode, diverted, crselapsedtime, actualelapsedtime, airtime, flights, distance, distancegroup, carrierdelay, weatherdelay, nasdelay, securitydelay, lateaircraftdelay, firstdeptime, totaladdgtime, longestaddgtime, divairportlandings, divreacheddest, divactualelapsedtime, divarrdelay, divdistance, div1airport, div1airportid, div1airportseqid, div1wheelson, div1totalgtime, div1longestgtime, div1wheelsoff, div1tailnum, div2airport, div2airportid, div2airportseqid, div2wheelson, div2totalgtime, div2longestgtime, div2wheelsoff, div2tailnum, div3airport, div3airportid, div3airportseqid, div3wheelson, div3totalgtime, div3longestgtime, div3wheelsoff, div3tailnum, div4airport, div4airportid, div4airportseqid, div4wheelson, div4totalgtime, div4longestgtime, div4wheelsoff, div4tailnum, div5airport, div5airportid, div5airportseqid, div5wheelson, div5totalgtime, div5longestgtime, div5wheelsoff, div5tailnum, year, month, dayofmonth FROM flight_sample_db.flight_delays_csv WHERE year = ? AND month = ? ) TO 's3://my-bucket/flight_data/' WITH (format='PARQUET',partitioned_by = ARRAY['year','month','dayofmonth'])
PREPARE文の説明
上記を重要部分だけに省略したのが以下です。
PREPARE unload_flight_delays_csv_as_parquet FROM UNLOAD (SELECT yr, ... div5tailnum, year, month, dayofmonth FROM flight_sample_db.flight_delays_csv WHERE year = ? AND month = ? ) TO 's3://my-bucket/flight_data/' WITH (format='PARQUET',partitioned_by = ARRAY['year','month','dayofmonth'])
少し説明入れます。
- 冒頭部分で
unload_flight_delays_csv_as_parquet
という名前でステートメント名を指定してます - 引数で渡された値が
WHERE year = ? AND month = ?
の?
に引き渡されて実行されます。 FROM UNLOAD (SELECT ...) TO 's3://'
の部分でUNLOAD先のS3バケットを指定しています- カラム名を
SELECT *
で無くしています。カラムを明示的に書き、且つ最後にpartitioned_by
に指定する配列と同じ項目を同じ順序で並べる必要があります。(ここだとyear,month,dayofmonth
)これをしないと以下のようなHIVE_COLUMN_ORDER_MISMATCH
というエラーが発生します。
HIVE_COLUMN_ORDER_MISMATCH: Partition keys must be the last columns in the table and in the same order as the table properties: [year, month, dayofweek]
EXECUTE文による処理実行
以下のように処理を実行してみます。引数として 1987年の 1987
を文字列、10月の 10
を整数にてUSING句の後にカンマ区切りで引数として渡しています。
EXECUTE unload_flight_delays_csv_as_parquet USING '1987',10
ちゃんとパーティショニングされてParquetが出力されています。
Parquet変換結果確認
その後他の月も実施して、Parquetに無事全て変換できました。Parquet側のメタデータ読み込みはGlueでやりました。
せっかく変換したので、いくつかのクエリでCSVとParquetで処理速度確認してみます。
- クエリ① 年指定
SELECT * FROM flight_data WHERE year = '1987'
- クエリ② 日付指定
SELECT * FROM flight_data WHERE year = '1987' AND month = 10 AND dayofmonth = 19
- クエリ③ 簡単なGroupBy
SELECT COUNT(*),carrier FROM flight_data WHERE year = '1987' GROUP BY carrier ORDER BY carrier
以下が比較と簡単な考察です。
クエリ | 実行時間(CSV) | スキャンデータ量(CSV) | 実行時間(Parquet) | スキャンデータ量(Parquet) | 考察 |
---|---|---|---|---|---|
クエリ① 年指定 | 185秒 | 13.84 MB | 29.24 秒 | 86.85 MB | 単純に全件見てもParquetの方が読み込み容量が少ないく、効率も良いことがわかります |
クエリ② 日付指定 | 18.27 秒 | 13.84 MB | 1.49 秒 | 976.66 KB | Paquetに変換した際に日付レベルでパーティション切っているのが効いてます |
クエリ③ 簡単なGroupBy | 13.35 秒 | 13.84 MB | 2.49 秒 | 414.77 KB | Paquetは列指向なので本当に一部のデータあのみしか使わないです |
最後に
事前にPREPAREでクエリを準備し、引数を設定して実行できることと、UNLOADを組み合わせることで柔軟な変換処理が実装できることがわかりました。
今回はCSVから読み込みましたが、当然JSON等からも読み込めます。UNLOADも今回はParquetとしましたが、ORC、AVRO、JSON、TEXTFILEも指定できます。圧縮も色々と指定可能です。この辺りはUNLOADの公式ドキュメントを参照してください。
AthenaはCLIやJDBCやODBCからも実行できるるので、CSVをParquetに変換する簡易なETLジョブなどはこれだけでも実装できそうです。