やりたいこと
以下のようにパーティションを切っているS3のバケットから、S3のバケットにGlueのETLジョブにて変換処理をかけています。
一部のパーティションのデータが間違っていた場合に、パーティションごとデータを部分更新したい場合がありました。
準備
テストデータ生成
英語の小文字、大文字とASCIIコードをデータとして持つJSONを生成してみます。
import json from collections import namedtuple def output_letters(name,letters): output_file = './data_{}.json'.format(name) with open(output_file,'w') as f: for l in letters: json.dump(l._asdict(), f) f.write('\n') Letter = namedtuple('letter',('letter','code')) capitals= [Letter(chr(s),s) for s in range(ord('A'),ord('A')+26)] output_letters("capital",capitals) smalls = [Letter(chr(s),s) for s in range(ord('a'),ord('a')+26)] output_letters("small",smalls) symbols = [Letter(chr(s),s) for s in range(91,96)] output_letters("symbol",symbols)
このような3つのJSONL形式のデータができます。
data_capital.json
{"letter": "A", "code": 65} {"letter": "B", "code": 66} ---省略--- {"letter": "Y", "code": 89} {"letter": "Z", "code": 90}
data_small.json
{"letter": "a", "code": 97} {"letter": "b", "code": 98} ---省略--- {"letter": "y", "code": 121} {"letter": "z", "code": 122}
data_symbol.json
{"letter": "[", "code": 91} {"letter": "\\", "code": 92} {"letter": "]", "code": 93} {"letter": "^", "code": 94} {"letter": "_", "code": 95}
S3にアップロード
以下のようにアップロードします。
part=capital/ - data_capital.json - data_symbol.json part=small/ - data_small.json
Glueでクローリングすると以下のようなスキーマになります。
ETLジョブ
Glueのウィザードを流せば以下のようなParquetに変換するETLジョブが簡単に生成できます。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "letter", table_name = "src", transformation_ctx = "datasource0") applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("letter", "string", "letter", "string"), ("code", "int", "code", "int"), ("part", "string", "part", "string")], transformation_ctx = "applymapping1") resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2") dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3") datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://mybucket/path/to/target","partitionKeys":["part"]}, format = "parquet", transformation_ctx = "datasink4") job.commit()
部分更新のケースを試してみる
例えば、特殊文字が入っている data_special.json
を part=capital
から抜くことにしました。つまりは以下の図のように part=capital
パーティションだけ部分更新したいです。
まずソース側のS3から記号の入った data_symbol.json
を削除します。
$ aws s3 rm s3://mybucket/path/src/part=capital/data_symbol.json delete: s3://mybucket/path/src/part=capital/data_symbol.json
part=capital/ - data_capital.json // 大文字 ☓- data_symbol.json // 削除
そこでもう一度ETLジョブを実行すれば part=capital
のパーティションが更新できるかというと、そう上手くはいきません。
課題がいくつかあります。
課題① ジョブブックマーク有効化時の再実行
Glueのジョブブックマークの機能は便利なのですが、挙動を知らないとうまく動かせません。
今回の場合、ジョブブックマーク有効な場合、ただ data_symbol.json
削除後にETLジョブを実行してもデータは何も更新されません。
これは残っているデータ data_capital.json
が既にETLジョブ実行済みとしてジョブブックマークにマーキング
されているからです。
ジョブのブックマークを使用した処理済みデータの追跡 - AWS Glue
課題① の対策
S3のオブジェクトのタイムスタンプを更新すると再度ジョブが実行されます。
Linuxのtouchのようなことをするのですが、S3の場合は同じファイルを同じ場所にコピーすることで似たようなことが可能です。
以下の同一ファイルにコピーするとタイムスタンプが更新されます。
$ aws s3 cp s3://mybucket/path/src/part=capital/data_capital.json s3://mybucket/path/src/part=capital/data_capital.json copy: s3://mybucket/path/src/part=capital/data_capital.json to s3://mybucket/path/src/part=capital/data_capital.json
タイムスタンプが更新された上でジョブを実行すると、再度処理対象として処理されます。が、ここで課題②が出てきます。
課題 ② DynamicFrameはAppendしかできない
ジョブブックマークの問題を超えて、ETLジョブを実行してみたところ、data_capital.json
がTarget側で2重に取り込まれてしまっています。
そして、記号もTarget側では削除されていません。
部分更新ではなく、単純に追記されたことになります。
SELECT letter, count(*) AS cnt FROM "target" WHERE part = 'capital' GROUP BY letter ORDER BY cnt;
上記SQLの結果です。
サポートにも問合せたのですが、GlueのDynamic Frameの書き出し部分、上記のスクリプトで言えば、以下の1行は2019/04/09時点ではAppendのみでOverwriteモードはサポートしていません。
課題②への対策
課題②への対策1 直接パーティションだけを操作する
上に書いたETLスクリプトを以下のように書き換えます。
#-- 省略 # 特定パーティションを設定無しで直接パーティションのロケーションに書き込み datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "letter", table_name = "src", push_down_predicate = "part='capital'", transformation_ctx = "datasource0") #-- 省略 # パーティショニング設定無しで直接パーティションのロケーションに書き込み datasink4 =dropnullfields3.toDF().write.mode("overwrite").format("parquet").save("s3://mybucket/path/src/target/part=capital") job.commit()
ポイントは2つです。
① push_down_predicate
を指定して部分更新したいパーティションのみSparkに読み込む
② DataFrameに変換してOverwriteモードで書き込む。この際にパスとしてPartition(例: part=capital
)を直接指定する。
課題②への対策2 Spark SQLでゴリゴリ書く
書き込みのところはSpark SQLでやってもできます。
temp_view = "tempview" dropnullfields3.toDF().createOrReplaceTempView(temp_view) spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict") query = """ INSERT OVERWRITE TABLE {tgt_table} PARTITION ({partition_key}) SELECT * FROM {src_view} WHERE {partition_key} = '{partition_val}' """.format( src_view=temp_view, tgt_table="letter.target", partition_key="part", partition_val="capital") datasink4 = spark.sql(query)
他にも色々方法あって、対象のデータ種類が少ないなら書けば何とでもできそうです。
ただ、3桁、4桁以上のデータ種別を扱う場合は、個別対応厳しいので、汎用化させるとなると少しめんどくさいです。
単純に別のディレクトリに書き込んで、後でコピーするという力技だけど一番良いのではと思う方法書かれています。
課題②への対策3 対応を待つ
開発しているという話あったりするし、Dynamic Frameもいつか対応してくれるかもしれません。
amazon web services - Overwrite parquet files from dynamic frame in AWS Glue - Stack Overflow
または、GlueのSparkバージョンが2.3.0になれば(現状は2.2.1)、この方法も使えるようになるので、少しシンプルに書けるようになります。
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") data.write.mode("overwrite").insertInto("partitioned_table")
参考
amazon web services - Overwrite parquet files from dynamic frame in AWS Glue - Stack Overflow
metadata - Is there a way to touch() file in Amazon S3? - Stack Overflow