æ¬è¨äºã¯ datatech-jp Advent Calendar 2024 ã®13æ¥ç®ã®è¨äºã§ãã
ã¯ããã«
Glue Catalogã«ç»é²ããiceberg tableã«AWS GlueJobã§ãã¼ã¿ã®æ¸ãè¾¼ã¿ãè¡ãã±ã¼ã¹ã¯å¤ããã¨æãã¾ãã
é常ãGlue Catalogã®ã¹ãã¼ãã«ç»é²ããã¦ããªãã«ã©ã ãæã¤ãã¼ã¿ãGlue Jobã§æ¸ãè¾¼ããã¨ããã¨ã¨ã©ã¼ã¨ãªãã¾ãã
ãã®ãããæ°ããã«ã©ã ãæã¤ãã¼ã¿ãæ¸ãè¾¼ãåã« alter table add columns
ãAthenaãªã©ã§å®è¡ããiceberg tableã®ã¹ãã¼ãå¤æ´ãè¡ãå¿
è¦ãããã¾ãã
ALTER TABLE ADD COLUMNS - Amazon Athena
Glue Jobã®å¦çã§æ°ããã«ã©ã ã追å ããå ´åããå ãã¼ã¿ã«ã«ã©ã ã追å ãããå ´åã«æ¯åä¸è¨ã®å¯¾å¿ãè¡ãã®ã¯éç¨ã³ã¹ããé«ããæä½ãã¹ã®å±éºãããã¾ãã
æ¬è¨äºã§ã¯ãæ±ããã¼ã¿ã®ã«ã©ã ã«å¿ãã¦åçã«iceberg tableã®ã«ã©ã ã追å ããæ¹æ³ãã¾ã¨ãã¾ãã
Glue Jobå®è¡ã®ããã®AWSãªã½ã¼ã¹æºå
AWS GlueJobãå®è¡ããããã®ãS3 Bucketã»IAM Roleã»Glue Databaseã»Glue Jobãä½æããTerraformãä½æãã¾ããã
$ cd terraform $ terraform init $ terraform apply
AWS Glue jobã§iceberg tableã«å¯¾ããæä½ãæå¹åããå ´åãGlue Jobã®Spark Configurationãé©åã«è¨å®ããå¿
è¦ãããã¾ãã
AWS Glue での Iceberg フレームワークの使用 - AWS Glue.
terraformã®ã³ã¼ãããglue jobã®ãªã½ã¼ã¹ãä½æããé¨åãæç²ããã¨ã以ä¸ã®ããã«ãªãã¾ãã
resource "aws_glue_job" "update_iceberg_table_schema" { name = "update_iceberg_table_schema" role_arn = aws_iam_role.glue_job_role.arn glue_version = "4.0" worker_type = "G.1X" number_of_workers = 2 max_retries = 0 execution_property { max_concurrent_runs = 10 } command { script_location = "s3://${aws_s3_bucket.glue_job.bucket}/scripts/update_iceberg_table_schema.py" } default_arguments = { "--enable-glue-datacatalog" = "true" "--TempDir" = "s3://${aws_s3_bucket.glue_job.bucket}/temporary/" "--spark-event-logs-path" = "s3://${aws_s3_bucket.glue_job.bucket}/sparkHistoryLogs/" "--enable-job-insights" = "false" "--enable-continuous-cloudwatch-log" = "true" "--datalake-formats" = "iceberg" # conf to enable iceberg format. ref: https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-programming-etl-format-iceberg.html "--conf" = "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.warehouse=file:///tmp/spark-warehouse" }
Glue Jobã§åãè¾¼ãã§icebergãã¼ãã«ã«æ¸ãè¾¼ãããã®ãµã³ãã«ãã¼ã¿ã2ã¤ç¨æãã¾ããã
https://github.com/nsakki55/code-for-blogpost/tree/main/glue_job_iceberg_schema_change/data
test_data.csv
col1 | col2 |
---|---|
aaa | 1 |
bbb | 2 |
ccc | 3 |
test_data_new_column.csv
col2 | col1 | col3 |
---|---|---|
444 | ddd | XXX |
555 | eee | YYY |
666 | fff | ZZZ |
2ã¤ã®ãã¹ããã¼ã¿ã®å·®ã¯ä»¥ä¸ã§ã
- col1, col2ã®ã«ã©ã ã®é åºãç°ãªã
- col3ã¨ããæ°ããã«ã©ã ã追å ããã¦ãã
ä»åã¯S3 Bucketã«2ã¤ã®ãµã³ãã«ãã¼ã¿ãé ç½®ãã¦ãGlue Jobã§èªã¿è¾¼ãå ãã¼ã¿ã¨ãã¾ãã
ã¢ãããã¼ãå ã®S3 Bucketåãterraform applyã§çæããS3 Bucketåã«å¤æ´ãã¦ä¸ããã
$ aws s3 cp ./data/test_data.csv s3://schema-change-data-20241208085330834400000002/input/ $ aws s3 cp ./data/test_data_new_column.csv s3://schema-change-data-20241208085330834400000002/input/
icebergãã¼ãã«ã«ãã¼ã¿ã追å ããGlueJobã®ä½æ
S3ããcsvãã¼ã¿ãèªã¿è¾¼ãã å¾ã以ä¸ã®å¦çãè¡ãGlue Jobãä½æãã¾ããã
- ãã¼ãã«ãåå¨ããªãå ´å
- ãã¼ãã«ãä½æ
- ãã¼ãã«ãåå¨ããå ´å
- ãã¼ãã«ã«ãã¼ã¿ã追å
import sys from typing import Dict from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions from awsglue.dynamicframe import DynamicFrame from pyspark.context import SparkContext S3_BUCKET = "schema-change-data-20241208085330834400000002" TABLE_NAME = "test_table" DATABASE_NAME = "test_database" CATALOG_NAME = "glue_catalog" def get_dynamic_frame_from_s3(glue_context: GlueContext, source_s3_path: str) -> DynamicFrame: print(f"Start get dynamic frame from S3. {source_s3_path=}") dyf = glue_context.create_dynamic_frame.from_options( format_options={ "withHeader": True, "separator": ",", }, connection_type="s3", format="csv", connection_options={ "paths": [source_s3_path], }, ) print(f"Finished get dynamic frame from S3. {dyf.count()=}") return dyf def check_table_in_database(glue_context: GlueContext, database_name: str, table_name: str) -> bool: print(f"Start check table in database. {database_name=}, {table_name=}") tables = glue_context.spark_session.catalog.listTables(database_name) is_exist = table_name in [table.name for table in tables] print(f"Finished check table in database. {is_exist=}") return is_exist def main(args: Dict[str, str]) -> None: sc = SparkContext() glue_context = GlueContext(sc) job = Job(glue_context) job.init(args["JOB_NAME"], args) print(f"Start update iceberg table schema. {args=}") dyf = get_dynamic_frame_from_s3( glue_context=glue_context, source_s3_path=f"s3://{S3_BUCKET}/input/{args['file_name']}", ) df = dyf.toDF() df.printSchema() is_exist = check_table_in_database(glue_context=glue_context, database_name=DATABASE_NAME, table_name=TABLE_NAME) table_path = f"{CATALOG_NAME}.{DATABASE_NAME}.{TABLE_NAME}" if is_exist: df.writeTo(table_path).append() else: df.writeTo(table_path).tableProperty("format-version", "2").tableProperty("location", f"s3://{S3_BUCKET}/output").create() print(f"Finished update iceberg table schema. {args=}") job.commit() if __name__ == "__main__": args = getResolvedOptions(sys.argv, ["JOB_NAME", "file_name"]) main(args)
Glue Jobã®å®è¡ã¹ã¯ãªãããS3ã«ã¢ãããã¼ããã¾ããS3 Bucketãterraform applyã§ä½æããS3 Bucketåã«å¤æ´ãã¦ä¸ããã
$ aws s3 cp ./src/update_iceberg_table_schema.py s3://glue-job-20241208085330834300000001/scripts/
1ã¤ç®ã®ãµã³ãã«ãã¼ã¿ã§ããtest_data.csvãèªã¿è¾¼ãGlue Jobãå®è¡ãã¾ãã
$ aws glue start-job-run \ --job-name update_iceberg_table_schema \ --arguments '{"--file_name": "test_data.csv"}'
Glue Catalogã®ã¹ãã¼ããè¦ãã¨ãcol1, col2ã追å ããã¦ãã¾ã
icebergãã¼ãã«ã®ã¹ãã¼ããå¤æ´ãã
icebergã§ã¯åçã«ã¹ãã¼ãå¤åã§ãããmergeSchemaã¨ãããªãã·ã§ã³ãæä¾ããã¦ãã¾ãã
Writes - Apache Iceberg™.
mergeSchemaãªãã·ã§ã³ã使ç¨ããå ´åã以ä¸ã®æåã¨ãªãã¾ãã
æ°ããã«ã©ã ããã¼ã¿ã½ã¼ã¹ã«åå¨ãããã対象ã®ãã¼ãã«ã«ã«ã©ã ãåå¨ããªãå ´å
â æ°ããã«ã©ã ã対象ã®ãã¼ãã«ã«è¿½å ããããæ¢åã®ã¬ã³ã¼ãã®æ°ããã«ã©ã ã«ã¯nullãè¨å®ãããã
ãã¼ã¿ã½ã¼ã¹ã«ã«ã©ã ãåå¨ããªããã対象ã®ãã¼ãã«ã«ã¯ã«ã©ã ãåå¨ããå ´å
â æ°ããã¬ã³ã¼ãã追å ã»ã¬ã³ã¼ããæ´æ°ããã¨ã対象ã®ã«ã©ã ã«nullãè¨å®ãããã
mergeSchemaãªãã·ã§ã³ã使ç¨ããã«ã¯ã対象ã®ãã¼ãã«ã®ããããã£è¨å®ã« 'write.spark.accept-any-schema'='true'
ã追å ããå¿
è¦ãããã¾ãã
ALTER TABLE test_database.test_table SET TBLPROPERTIES ( 'write.spark.accept-any-schema'='true' )
ä¸è¨ã®ããããã£è¿½å ã®ã¯ã¨ãªãAthenaçµç±ã§å®è¡ããã¨ããµãã¼ãããã¦ããªãããããã£ã¨ã©ã¼ãã§ã¾ãã
Unsupported table property key: write.spark.accept-any-schema
Athenaã§ã¯å¤æ´å¯è½ãªicebergãã¼ãã«ã®ããããã£ã«å¶éãããã¾ãã
Create Iceberg tables - Amazon Athena
ãã®ãããä¸è¨ã®ããããã£å¤æ´ãAthenaçµç±ã§ã¯è¡ãã¾ããã
AWSãåºãã¦ãicebergãã¼ãã«ã«é¢ããè¨äºã§ã'write.spark.accept-any-schema'='true'
ã®ããããã£è¨å®ãsparkçµç±ã§å®è¡ãã¦ãã®ã確èªã§ãã¾ãã
Modernize your legacy databases with AWS data lakes, Part 2: Build a data lake using AWS DMS data on Apache Iceberg | AWS Big Data Blog
æ¬è¨äºã§ã¯ãã®æ¹æ³ã«ç¿ããsparkçµç±ã§å¿ è¦ãªããããã£è¨å®ãè¡ãã¾ãã
æ¢åãã¼ãã«ã¸ã®ãã¼ã¿è¿½å ã®å®è£ ã以ä¸ã®ããã«å¤æ´ãã¾ããã
sql = f"ALTER TABLE {table_path} SET TBLPROPERTIES ('write.spark.accept-any-schema' = 'true')" glue_context.spark_session.sql(sql) df.writeTo(table_path).option("mergeSchema","true").append()
æ°ããã«ã©ã ãæã¤csvãã¼ã¿ãèªã¿è¾¼ãGlueJobãå®è¡ãã¾ã
$ aws glue start-job-run \ --job-name update_iceberg_table_schema \ --arguments '{"--file_name": "test_data_new_column.csv"}'
ããããä¸è¨ã®è¨å®ã®GlueJobã¯å¤±æãã¾ãã
åå ã¯col1, col2ã®ã«ã©ã ã®é çªããã¼ãã«ã¹ãã¼ãã¨å ãã¼ã¿ã§ç°ãªãããã§ãã
2024-12-09 01:21:18,419 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(77)): Error from Python:Traceback (most recent call last): File "/tmp/update_iceberg_table_schema.py", line 74, in <module> main(args) File "/tmp/update_iceberg_table_schema.py", line 63, in main df.writeTo(table_path).option("mergeSchema","true").append() File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1460, in append self._jwriter.append() File "/opt/amazon/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__ return_value = get_return_value( File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in deco raise converted from None pyspark.sql.utils.IllegalArgumentException: Cannot write incompatible dataset to table with schema: table { 1: col1: optional string 2: col2: optional string 3: col3: optional string } Provided schema: table { 2: col2: optional string 1: col1: optional string 3: col3: optional string } Problems: * col2 is out of order, before col1
icebergã®spark optionã«ã¯ check-ordering
ã¨ããè¨å®ãããã¾ãã
ããã¯å ¥åã¹ãã¼ãã¨ãã¼ãã«ã¹ãã¼ããåãããã§ãã¯ããè¨å®ã§ãããã©ã«ãã§ã¯Trueã¨ãªãã¾ãã
icebergã®GitHubã¬ãã¸ããªã®mergeSchemaã«é¢ããissueã§è°è«ããã¦ããããã«ãã¹ãã¼ãå¤æ´ããããã«check-ordering
ãFalseã«ããå¿
è¦ãããã¾ãã
Adding new columns (mergeSchema) · Issue #8908 · apache/iceberg · GitHub
GlueJobã®ãªã½ã¼ã¹ãä½æããterraformè¨å®ã«ã以ä¸ã®configã追å ãã¾ãã
--conf spark.sql.iceberg.check-ordering=false
GlueJobã®å¤æ´ãåæ ãã¾ã
$ terraform apply
æ°ããã«ã©ã ãæã¤csvãã¼ã¿ãèªã¿è¾¼ãGlue Jobãåã³å®è¡ãã¾ããä»åº¦ã¯Jobãæåãã¾ãã
$ aws glue start-job-run \ --job-name update_iceberg_table_schema \ --arguments '{"--file_name": "test_data_new_column.csv"}'
Glue Catalogã®ã¹ãã¼ãã確èªããã¨ãã¹ãã¼ããå¤æ´ããcol3ã追å ããã¦ããã®ã確èªã§ãã¾ãã
test_tableã®ä¸èº«ã確èªãã¦ã¿ãã¨ãæ¢åã®ã¬ã³ã¼ãã«ã¯æ°ãã追å ãããcol3ã®ã«ã©ã ã«nullãè¨å®ããã¦ãã¾ãã
GlueJobããåçã«iceberg tableã®ã¹ãã¼ããå¤æ´ã§ãããã¨ãã§ããããã«ãªãã¾ããã