AthenaのUNLOADとPREPAREでパラメータ化したクエリを組み合わせCSVをParquetに変換してみる

始めに

先日、こちらのリリースが発表されました。

Athena can now write query results in Parquet, Avro, ORC and JSON formats

これにより、AthenaでもUNLOAD機能が使えるようになりました。

この記事では、このUNLOADを利用して、S3上に置いたCSVを、Parquet形式に変換しS3に出力するETLをAthenaを使って実装します。

f:id:yomon8:20210806200420p:plain

実はCTASを使って同じようなことができます。2年前に以下の記事を書いていました。ただCTASは用途も違うためETL的には使いにくいものでした。 yomon.hatenablog.com

今回、PREPAREとUNLOADを一緒に使うことでパラメータ化した変換処理を実装すると、ある程度は柔軟な処理が実装できそうなのでやってみます。

CSVテーブル準備

利用するCSV形式のデータ

AWSから提供されているS3バケット上にある、Athena用のCSVサンプルデータを利用します。

 s3://athena-examples-ap-northeast-1/flight/csv/

Athenaに読み込み

Glueからクローラなどを使うこともできますが、今回は全てAthenaから処理を実行します。

DB作成

Athenaから flight_sample_db という名前のDBを作成します。既存のDBを使う場合は、この後の作業のDB名を読み替えて下さい。

CREATE DATABASE flight_sample_db
テーブル作成

flight_sample_db にS3からCSVを読み込み、flight_delays_csv テーブルを作成します。

CREATE EXTERNAL TABLE flight_sample_db.flight_delays_csv (
 yr INT,
 quarter INT,
 month INT,
 dayofmonth INT,
 dayofweek INT,
 flightdate STRING,
 uniquecarrier STRING,
 airlineid INT,
 carrier STRING,
 tailnum STRING,
 flightnum STRING,
 originairportid INT,
 originairportseqid INT,
 origincitymarketid INT,
 origin STRING,
 origincityname STRING,
 originstate STRING,
 originstatefips STRING,
 originstatename STRING,
 originwac INT,
 destairportid INT,
 destairportseqid INT,
 destcitymarketid INT,
 dest STRING,
 destcityname STRING,
 deststate STRING,
 deststatefips STRING,
 deststatename STRING,
 destwac INT,
 crsdeptime STRING,
 deptime STRING,
 depdelay INT,
 depdelayminutes INT,
 depdel15 INT,
 departuredelaygroups INT,
 deptimeblk STRING,
 taxiout INT,
 wheelsoff STRING,
 wheelson STRING,
 taxiin INT,
 crsarrtime INT,
 arrtime STRING,
 arrdelay INT,
 arrdelayminutes INT,
 arrdel15 INT,
 arrivaldelaygroups INT,
 arrtimeblk STRING,
 cancelled INT,
 cancellationcode STRING,
 diverted INT,
 crselapsedtime INT,
 actualelapsedtime INT,
 airtime INT,
 flights INT,
 distance INT,
 distancegroup INT,
 carrierdelay INT,
 weatherdelay INT,
 nasdelay INT,
 securitydelay INT,
 lateaircraftdelay INT,
 firstdeptime STRING,
 totaladdgtime INT,
 longestaddgtime INT,
 divairportlandings INT,
 divreacheddest INT,
 divactualelapsedtime INT,
 divarrdelay INT,
 divdistance INT,
 div1airport STRING,
 div1airportid INT,
 div1airportseqid INT,
 div1wheelson STRING,
 div1totalgtime INT,
 div1longestgtime INT,
 div1wheelsoff STRING,
 div1tailnum STRING,
 div2airport STRING,
 div2airportid INT,
 div2airportseqid INT,
 div2wheelson STRING,
 div2totalgtime INT,
 div2longestgtime INT,
 div2wheelsoff STRING,
 div2tailnum STRING,
 div3airport STRING,
 div3airportid INT,
 div3airportseqid INT,
 div3wheelson STRING,
 div3totalgtime INT,
 div3longestgtime INT,
 div3wheelsoff STRING,
 div3tailnum STRING,
 div4airport STRING,
 div4airportid INT,
 div4airportseqid INT,
 div4wheelson STRING,
 div4totalgtime INT,
 div4longestgtime INT,
 div4wheelsoff STRING,
 div4tailnum STRING,
 div5airport STRING,
 div5airportid INT,
 div5airportseqid INT,
 div5wheelson STRING,
 div5totalgtime INT,
 div5longestgtime INT,
 div5wheelsoff STRING,
 div5tailnum STRING
)
 PARTITIONED BY (year STRING)
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY ','
 ESCAPED BY '\\'
 LINES TERMINATED BY '\n'
 LOCATION 's3://athena-examples-ap-northeast-1/flight/csv/';

Athenaの画面からもDBとテーブルが作成されたことが確認できます。

f:id:yomon8:20210806202553p:plain:w400

パーティション読み込み

aws s3 ls でCSVデータのパスを確認するとわかるのですが、CSVは year のカラムでパーティション分割されています。

$ aws s3 ls s3://athena-examples-ap-northeast-1/flight/csv/
                           PRE year=1987/
                           PRE year=1988/
                           PRE year=1989/
                           PRE year=1990/
                           PRE year=1991/
                           PRE year=1992/
                           PRE year=1993/
#省略

以下でAthenaから実行しパーティションを読み込みます。

MSCK REPAIR TABLE flight_sample_db.flight_delays_csv

パーティションが読み込まれました。

f:id:yomon8:20210806202948p:plain

CSVをクエリしてみる

こちらのファイルをクエリしてみます。圧縮で14MB程度のファイルです。

$ aws s3 ls  s3://athena-examples-ap-northeast-1/flight/csv/year=1987/
2017-05-08 13:11:43   14510443 1987-cleaned.csv.bz2

year = '1987' とするとパーティションが機能して、上記のファイルのみが取得できるはずです。

SELECT *
FROM flight_sample_db.flight_delays_csv
WHERE year = '1987'

結果は以下の通り。パーティションが効いてそうで、上記のAWS CLIで取得したファイルが読み込まれてそうです。

(実行時間: 3 分 5 秒, スキャンしたデータ: 13.84 MB)

f:id:yomon8:20210806203912p:plain

PREPARE文による処理の準備

PREPARE文を利用することで、事前に用意したクエリにパラメータを引き渡し実行することが可能になります。 docs.aws.amazon.com

PREPAREによる処理作成

以下のように作成してみます。長いですが下に説明書きます。

PREPARE unload_flight_delays_csv_as_parquet
FROM UNLOAD 
(SELECT
  yr,
  quarter,
  dayofweek,
  flightdate,
  uniquecarrier,
  airlineid,
  carrier,
  tailnum,
  flightnum,
  originairportid,
  originairportseqid,
  origincitymarketid,
  origin,
  origincityname,
  originstate,
  originstatefips,
  originstatename,
  originwac,
  destairportid,
  destairportseqid,
  destcitymarketid,
  dest,
  destcityname,
  deststate,
  deststatefips,
  deststatename,
  destwac,
  crsdeptime,
  deptime,
  depdelay,
  depdelayminutes,
  depdel15,
  departuredelaygroups,
  deptimeblk,
  taxiout,
  wheelsoff,
  wheelson,
  taxiin,
  crsarrtime,
  arrtime,
  arrdelay,
  arrdelayminutes,
  arrdel15,
  arrivaldelaygroups,
  arrtimeblk,
  cancelled,
  cancellationcode,
  diverted,
  crselapsedtime,
  actualelapsedtime,
  airtime,
  flights,
  distance,
  distancegroup,
  carrierdelay,
  weatherdelay,
  nasdelay,
  securitydelay,
  lateaircraftdelay,
  firstdeptime,
  totaladdgtime,
  longestaddgtime,
  divairportlandings,
  divreacheddest,
  divactualelapsedtime,
  divarrdelay,
  divdistance,
  div1airport,
  div1airportid,
  div1airportseqid,
  div1wheelson,
  div1totalgtime,
  div1longestgtime,
  div1wheelsoff,
  div1tailnum,
  div2airport,
  div2airportid,
  div2airportseqid,
  div2wheelson,
  div2totalgtime,
  div2longestgtime,
  div2wheelsoff,
  div2tailnum,
  div3airport,
  div3airportid,
  div3airportseqid,
  div3wheelson,
  div3totalgtime,
  div3longestgtime,
  div3wheelsoff,
  div3tailnum,
  div4airport,
  div4airportid,
  div4airportseqid,
  div4wheelson,
  div4totalgtime,
  div4longestgtime,
  div4wheelsoff,
  div4tailnum,
  div5airport,
  div5airportid,
  div5airportseqid,
  div5wheelson,
  div5totalgtime,
  div5longestgtime,
  div5wheelsoff,
  div5tailnum,
  year,
  month,
  dayofmonth
    FROM flight_sample_db.flight_delays_csv
    WHERE year = ? AND month = ?
) TO 's3://my-bucket/flight_data/'
WITH (format='PARQUET',partitioned_by = ARRAY['year','month','dayofmonth'])

PREPARE文の説明

上記を重要部分だけに省略したのが以下です。

PREPARE unload_flight_delays_csv_as_parquet
FROM UNLOAD 
(SELECT
  yr,
...
  div5tailnum,
  year,
  month,
  dayofmonth
  FROM flight_sample_db.flight_delays_csv
  WHERE year = ? AND month = ?
) TO 's3://my-bucket/flight_data/'
WITH (format='PARQUET',partitioned_by = ARRAY['year','month','dayofmonth'])

少し説明入れます。

  • 冒頭部分で unload_flight_delays_csv_as_parquet という名前でステートメント名を指定してます
  • 引数で渡された値が WHERE year = ? AND month = ?? に引き渡されて実行されます。
  • FROM UNLOAD (SELECT ...) TO 's3://'の部分でUNLOAD先のS3バケットを指定しています
  • カラム名を SELECT * で無くしています。カラムを明示的に書き、且つ最後に partitioned_by に指定する配列と同じ項目を同じ順序で並べる必要があります。(ここだと year,month,dayofmonth )これをしないと以下のような HIVE_COLUMN_ORDER_MISMATCH というエラーが発生します。
HIVE_COLUMN_ORDER_MISMATCH: Partition keys must be the last columns in the table and in the same order as the table properties: [year, month, dayofweek]

EXECUTE文による処理実行

以下のように処理を実行してみます。引数として 1987年の 1987 を文字列、10月の 10 を整数にてUSING句の後にカンマ区切りで引数として渡しています。

EXECUTE unload_flight_delays_csv_as_parquet USING '1987',10

f:id:yomon8:20210806212454p:plain:w400

ちゃんとパーティショニングされてParquetが出力されています。

f:id:yomon8:20210806212611p:plain:w400

Parquet変換結果確認

その後他の月も実施して、Parquetに無事全て変換できました。Parquet側のメタデータ読み込みはGlueでやりました。

せっかく変換したので、いくつかのクエリでCSVとParquetで処理速度確認してみます。

  • クエリ① 年指定
SELECT *
FROM flight_data
WHERE year = '1987'
  • クエリ② 日付指定
SELECT *
FROM flight_data
WHERE year = '1987'
AND month = 10
AND dayofmonth = 19
  • クエリ③ 簡単なGroupBy
SELECT COUNT(*),carrier
FROM flight_data
WHERE year = '1987'
GROUP BY carrier
ORDER BY carrier

以下が比較と簡単な考察です。

クエリ 実行時間(CSV) スキャンデータ量(CSV) 実行時間(Parquet) スキャンデータ量(Parquet) 考察
クエリ① 年指定 185秒 13.84 MB 29.24 秒 86.85 MB 単純に全件見てもParquetの方が読み込み容量が少ないく、効率も良いことがわかります
クエリ② 日付指定 18.27 秒 13.84 MB 1.49 秒 976.66 KB Paquetに変換した際に日付レベルでパーティション切っているのが効いてます
クエリ③ 簡単なGroupBy 13.35 秒 13.84 MB 2.49 秒 414.77 KB Paquetは列指向なので本当に一部のデータあのみしか使わないです

最後に

事前にPREPAREでクエリを準備し、引数を設定して実行できることと、UNLOADを組み合わせることで柔軟な変換処理が実装できることがわかりました。

今回はCSVから読み込みましたが、当然JSON等からも読み込めます。UNLOADも今回はParquetとしましたが、ORC、AVRO、JSON、TEXTFILEも指定できます。圧縮も色々と指定可能です。この辺りはUNLOADの公式ドキュメントを参照してください。

AthenaはCLIやJDBCやODBCからも実行できるるので、CSVをParquetに変換する簡易なETLジョブなどはこれだけでも実装できそうです。