ããã«ã¡ã¯ãã¨ã ã¹ãªã¼ã¨ã³ã¸ãã¢ãªã³ã°ã°ã«ã¼ã ã³ã³ã·ã¥ã¼ããã¼ã ã®åç°ã§ãã
Google BigQuery Connector for AWS Glue ã使ã£ã¦ AWS ä¸ã«ãããã¼ã¿ã BigQuery ã«æ¸ãè¾¼ãã¨ããã®ã Terraform ã§ãã£ã¦ããã¾ãã BigQuery ãã AWS ã¸ã®ãã¼ã¿åãè¾¼ã¿ã¯ã°ã°ãã°ãããã§ããããããã®ã§ãããéãã¿ã¼ã³ã¯ã»ã¨ãã©ãªãã£ãã®ã¨ãå ¬å¼ããã¥ã¡ã³ããä¸ååã ã£ãã®ã§åå¿ãå ¼ãã¦æ稿ãã¾ãã
ãã®è¨äºã¯ ã¨ã ã¹ãªã¼ Advent Calendar 2022 ã® 8 æ¥ç®ã®è¨äºã§ããåæ¥ã¯ @no_clock ã«ãããRenovate ããããéç¨ãã¯ããã¦åå¹´ãçµã¡ã¾ãããã§ããã
Connector ã Glue ã®ãã¼ã¸ã§ã³ã¯ãããã以ä¸ã®ã¨ããã§ãã
- Google BigQuery Connector for AWS Glue 0.24.2 *1
- AWS Glue 3.0 - Supports spark 3.1, Scala 2, Python 3
ãã¤ã³ãã¯ä»¥ä¸ã® 4 ç¹
- BigQuery ããã® Read ã§ã¯ credentials ã®è¨å®ã«
Secrets Manager
ã使ãããã© Write ã§ã¯ãªãã使ããªã - Glue ã® Context ã§
SparkConf
ãä¸æ¸ãããããæ¹ããé常ã®spark-bigquery-connector
ã®æ段ã¨ç°ãªã - ãã¼ã¿ãã¼ã¹æ¥ç¶ã® credentials ã§
Secrets Manager
ã使ãã¨ãªããã¨ã©ã¼ã«ãªã - VPC ã§å®è¡ããå ´åã¯ã¢ã¦ããã¦ã³ãã許å¯ããªã㨠ECR ã® API ãã¿ã¤ã ã¢ã¦ããã
ãªãããã®è¨äºã§ã¯ AWS å´ã®å®è£ ãåãæ±ããããBigQuery ã§å©ç¨ãã Google ã®ãµã¼ãã¹ã¢ã«ã¦ã³ã㨠GCS ãã±ããã¯ãããããç¨æãã¦ãããã®ã¨ãã¦ãã¾ãã
- æ³å®ã·ããªãª
- ãã£ããæé
- Aurora ç¨ã® AWS Glue Connection ä½æ
- BigQuery ç¨ã® AWS Glue Connection ä½æ
- AWS Glue Job ç¨ã® IAM ãã¼ã«ä½æ
- AWS Glue Job ä½æ
- ã¸ã§ãã®å®è¡
- ã½ã¼ã¹ã³ã¼ãè©ä¾¡æã«çºçããã¨ã©ã¼ãã¡
- ãããã«
æ³å®ã·ããªãª
ä»å㯠AWS ã® Amazon Aurora ã§æ§ç¯ããããã¼ã¿ãã¼ã¹ä¸ã«ãããã¼ã¿ã JDBC ãã©ã¤ãã¼ã§èªã¿è¾¼ã¿ãBigQuery ã«åãè¾¼ãå¦çãå®è£ ãã¦ã¿ããã¨æãã¾ãã
æ¬æ¥ãªã AWS Glue Crawler ãå®ç¾©ã㦠Aurora ãã¼ã¿ãã¼ã¹ã¹ãã¼ãã®ã¡ã¿ãã¼ã¿ (ãã¼ãã«æ§é ãªã©) ãåéãã¦ãã ETL ã¸ã§ããå®è£ ããã®ãã»ãªãªã¼ã ã¨æãã¾ãããã ãããã ã¨åç½®ããé·ããªããããããä»å㯠DB ãã¼ãã«ã®ãã¼ã¿ãç´æ¥èªã¿è¾¼ãã§ãç¡å¤æ㧠BigQuery ã«åãè¾¼ãå¦çãå®è£ ãã¾ãã
ãã£ããæé
- Aurora ç¨ã® AWS Glue Connection ä½æ
- BigQuery ç¨ã® AWS Glue Connection ä½æ
- AWS Glue Job ç¨ã® IAM ãã¼ã«ä½æ
- AWS Glue Job ä½æ
Aurora ç¨ã® AWS Glue Connection ä½æ
ä»åã®ã·ããªãªã§ã¯ãã¼ã¿ã½ã¼ã¹ã Aurora ã¨ãããããAurora ã«å¯¾ãã Connection ãä½æãã¾ãããã¼ã¿ã½ã¼ã¹ã S3 ã DynamoDB ã«ããå ´å㯠Connection ã®ä½æã¯ä¸è¦ã§ãã*2
ãªããConnection ã¨ã¯ãAWS Glue ã® AWS ãªã½ã¼ã¹ã® 1 ã¤ã§ããã¼ã¿ã½ã¼ã¹ã¨ã®æ¥ç¶æ å ±ã表ããã®ã§ããConnection ã¨ã¯å¥ã« Connector ã¨ããã®ãããã®ã§ç´ããããã§ãããæ¬ç¨¿ã§ã¯ Connection ã¨æ¸ããã Connection ãªã½ã¼ã¹ã®ãã¨ãæãã¾ãã
ã»ãã¥ãªãã£ã°ã«ã¼ãã®ä½æã¨å¤æ´
VPC å ã® Aurora ããã¼ã¿ã½ã¼ã¹ã¨ããå ´åãAWS Glue ã® JDBC Connection ã«ã¯ DB æ¥ç¶ãå¯è½ãª VPC ãµããããã¨ã»ãã¥ãªãã£ã°ã«ã¼ããæå®ããå¿ è¦ãããã¾ããã¾ããä¸æãã¡ã¤ã«ã Spark ã®ãã°ãªã©ã S3 ã«åºåãããããS3 ã«å¯¾ããã¢ã¦ããã¦ã³ãéä¿¡ã許å¯ããã¦ããå¿ è¦ãããã¾ãã
ä¸ã®å ¬å¼ããã¥ã¡ã³ãã§ã¯ãèªå·±åç §ã® I/O ã¨ãS3ã¨ã³ããã¤ã³ãã«å¯¾ããã¢ã¦ããã¦ã³ãã®è¨±å¯ã追å ãã¨æ¸ãã¦ããã¾ãããä»åã¯æ°ããã»ãã¥ãªãã£ã°ã«ã¼ããä½æããã¢ã¦ããã¦ã³ãã®è¨±å¯ã¯ãã¡ãã§è¡ãããã«ãã¾ããã¾ããGoogle BigQuery Connector for AWS Glue 㯠ECR ã«ã¢ã¯ã»ã¹ã§ããªã㨠Connector ã®ã³ã³ããã¤ã¡ã¼ã¸ããã¦ã³ãã¼ãã§ããã«ã¿ã¤ã ã¢ã¦ãã¨ã©ã¼ãèµ·ãã¾ããus-east-1 ã® ECR ã«å¯¾ãã VPC ã¨ã³ããã¤ã³ããç¨æããã®ããã¹ãã§ãããä»åã¯é¢åãªã®ã§å ¨ã¢ã¦ããã¦ã³ããã©ãã£ãã¯ã許å¯ãã¦ãã¾ãã
ã¾ãã¯èªå·±åç §ã® I/O ã«ã¼ã«ã§ãããã§ã«ããå ´åã¯ä¸è¦ã§ãã
variable "database_connection_security_group_id" {} data "aws_security_group" "database" { id = var.database_connection_security_group_id } resource "aws_security_group_rule" "ingress_self" { security_group_id = data.aws_security_group.database.id type = "ingress" from_port = 0 to_port = 65535 protocol = "tcp" self = true } resource "aws_security_group_rule" "egress_self" { security_group_id = data.aws_security_group.database.id type = "egress" from_port = 0 to_port = 65535 protocol = "tcp" self = true }
ç¶ãã¦æ°è¦ã»ãã¥ãªãã£ã°ã«ã¼ããä½æãã¦ãã¢ã¦ããã¦ã³ã許å¯ã追å ãã¾ãã
variable "glue_security_group_name" {} resource "aws_security_group" "glue" { name = var.glue_security_group_name vpc_id = data.aws_security_group.database.vpc_id tags = { Name = var.glue_security_group_name } } # å ¨ã¢ã¦ããã¦ã³ããã©ãã£ãã¯ã許å¯ï¼s3 㨠ecr ã®ã¿ã«çµã£ãæ¹ããã¿ã¼ï¼ resource "aws_security_group_rule" "egress_all" { security_group_id = aws_security_group.glue.id type = "egress" from_port = 0 to_port = 0 protocol = "-1" cidr_blocks = ["0.0.0.0/0"] ipv6_cidr_blocks = ["::/0"] }
S3 ã¨ã³ããã¤ã³ããªã©ã追å ããå ´åã¯ä»¥ä¸ã®ããã«ãªãã¨æãã¾ãã
data "aws_vpc_endpoint" "s3" { vpc_id = aws_security_group.glue.vpc_id service_name = "com.amazonaws.${data.aws_region.current.name}.s3" } resource "aws_security_group_rule" "egress_s3" { security_group_id = aws_security_group.glue.id type = "egress" from_port = 443 to_port = 443 protocol = "tcp" prefix_list_ids = [data.aws_vpc_endpoint.s3.prefix_list_id] }
AWS Glue JDBC Connection ã®ä½æ
ã¯ã¬ãã³ã·ã£ã«ã¨ã»ãã¥ãªãã£ã°ã«ã¼ããã§ããã Connection ãä½æãã¾ãã
variable "database_cluster_identifier" {} variable "database_connection_subnet_id" {} variable "database_name" {} variable "database_force_ssl" { default = false } variable "database_credentials" { type = object({ username = string, password = string }) sensitive = true } data "aws_rds_cluster" "database" { cluster_identifier = var.database_cluster_identifier } data "aws_subnet" "database" { id = var.database_connection_subnet_id } locals { database_engine = endswith(data.aws_rds_cluster.database.engine, "postgresql") ? "postgresql" : "mysql" database_jdbc_url = "jdbc:${local.database_engine}://${data.aws_rds_cluster.database.endpoint}:${data.aws_rds_cluster.database.port}/${var.database_name}" } resource "aws_glue_connection" "database" { name = "database-connection" description = "Aurora Database Connection" connection_type = "JDBC" connection_properties = { JDBC_CONNECTION_URL = local.database_jdbc_url JDBC_ENFORCE_SSL = var.database_force_ssl USERNAME = var.database_credentials.username PASSWORD = var.database_credentials.password # Management Console ã§ä½æããã¨ãªãããã®ããããã£ãåå¨ãã KAFKA_SSL_ENABLED = false # SECRET_ID ãæå®ãã㨠Glue ã¸ã§ããå¿ ãã¨ã©ã¼ã«ãªãã¾ãï¼ãã¹ãæ¥ç¶ã¯æåããã®ã«ï¼ } # ä½æããã»ãã¥ãªãã£ã°ã«ã¼ããç´ä»ãã physical_connection_requirements { subnet_id = data.aws_subnet.database.id availability_zone = data.aws_subnet.database.availability_zone security_group_id_list = [ aws_security_group.glue.id, # S3,ECR ç¨ data.aws_security_group.database.id, # DB æ¥ç¶ç¨ ] } }
ã¡ãªã¿ã«ãå¾è¿°ãã BigQuery Connection ã«ã»ãã¥ãªãã£ã°ã«ã¼ããç´ä»ãã¦ã ECR ã¨ã®çéã¯ãªããã¨ã©ã¼ã«ãªãã¾ããããªã®ã§ãECR ã¨ç´æ¥é¢ä¿ã®ãªããã¼ã¿ãã¼ã¹ã® Connection ã«ã»ãã¥ãªãã£ã°ã«ã¼ããç´ä»ãã¦ãã¾ãããããããã¼ã¿ãã¼ã¹ã® Connection ã¯ã¸ã§ããã¼ãã®ãã¼ã«ã«ã§ç¢ºç«ãããã®ã«å¯¾ããBigQuery Connection ã¯ã³ã³ããä¸ã«ç¢ºç«ãããããã ã¨æãã¾ãããConnection ã¾ããã®ãããã¯ã¼ã¯ä»æ§ãè¤éããã¦ãããã¡ãã£ã¨æ½è±¡åãã¦ããããããã®ã§ãã*3
BigQuery ç¨ã® AWS Glue Connection ä½æ
BigQuery ã® AWS Glue Connection ãä½æããã®ã¯ãéä¸ã¾ã§å ¬å¼ã®ããã°ã«æ¸ãã¦ããã¨ããã®æé ã§ãã
- AWS Secrets Manager ã«ã¯ã¬ãã³ã·ã£ã«ã® JSON ã³ã³ãã³ããæ ¼ç´ãã (Writeã§ã¯å©ç¨ããªã)
- Google BigQuery Connector for AWS Glue ããµãã¹ã¯ã©ã¤ããã
- Google BigQuery Connector for AWS Glue ãå©ç¨ã㦠Glue Connection ãä½æãã
ããã¾ã§ããã°ã®æé ã©ããã«å®æ½ãã¾ãã
ãã¼ã±ãããã¬ã¤ã¹ã§ãµãã¹ã¯ã©ã¤ããå®è¡ãã¦ã¢ã¯ãã£ãã¼ãããã¨èªåçã«ã«ã¹ã¿ã Connector ã追å ããã¾ããããã°ã§ã¯ãã®ãã¨ã« Connection ã®ä½æã¾ã§ãã£ã¦ãã¾ãããterraform ã§ä½æã§ããã®ã§ããã®æé 以éã¯å®æ½ãã¾ããã
BigQuery æ¥ç¶ç¨ã®ã¯ã¬ãã³ã·ã£ã«ãç»é²
BigQuery æ¥ç¶ã«å¿ è¦ãªãµã¼ãã¹ã¢ã«ã¦ã³ãã®ã¯ã¬ãã³ã·ã£ã« JSON ã Secrets Manager ã«ç»é²ãã¾ãã
variable "bigquery_credentials_json" { sensitive = true } resource "aws_secretsmanager_secret" "bigquery_credentials" { name = "bigquery_credentials" } resource "aws_secretsmanager_secret_version" "bigquery_credentials" { secret_id = aws_secretsmanager_secret.bigquery_credentials.id secret_string = jsonencode({ credentials = base64encode(var.bigquery_credentials_json) }) }
å¾è¿°ãã¾ãããä¸ã®ã¯ã¬ãã³ã·ã£ã«ã¯ BigQuery ããã®èªã¿è¾¼ã¿æã«ã®ã¿å©ç¨ãããæ¸ãè¾¼ã¿æ㯠S3 ã«åãå 容㮠JSON ãã¡ã¤ã«ãå¥éå¿ è¦ã§ãããã®ããä»åã¯å©ç¨ããªãã®ã§ãããConnection ãä½æããã«ãããå¿ è¦ãªã®ã§ãããããä½æãã¦ãã¾ãã
AWS Glue BigQuery Connection ã®ä½æ
ã¯ã¬ãã³ã·ã£ã«ãç»é²ã§ããã Connection ãä½æãã¾ããCONNECTOR_URL
㯠https://709825985650.dkr.ecr.us-east-1.amazonaws.com/amazon-web-services/glue/bigquery:0.24.2-glue3.0
ã¨ãªãã¾ãããã¼ã¸ã§ã³ã«ãã£ã¦ç°ãªãã®ã§ãããã® ECR URL ã«ä½ãè¨å®ããããããããããªãå ´åã¯ããã¸ã¡ã³ãã³ã³ã½ã¼ã«ã® Connector (Connection
ãããªãã¦Connector
) ã®è©³ç´°ãã確èªãã¦ãã ããã match_criteria
ã¯ããã¸ã¡ã³ãã³ã³ã½ã¼ã«ã§ä½æããã¨ãã«è¨å®ããã¦ããå¤ããã®ã¾ã¾å
¥ãã¦ãã¾ããä¸åº¦ããã¸ã¡ã³ãã³ã³ã½ã¼ã«ã§ä½æãããã®ã import ããæ¹ãå®å
¨ããããã¾ããã
resource "aws_glue_connection" "bigquery" { name = "bigquery-connection" description = "Google BigQuery Connection for AWS Glue 3.0" connection_type = "MARKETPLACE" match_criteria = ["Connection", "Google BigQuery Connector 0.24.2 for AWS Glue 3.0"] connection_properties = { CONNECTOR_TYPE = "Spark" CONNECTOR_CLASS_NAME = "com.google.cloud.spark.bigquery" CONNECTOR_URL = "https://709825985650.dkr.ecr.us-east-1.amazonaws.com/amazon-web-services/glue/bigquery:0.24.2-glue3.0" SECRET_ID = aws_secretsmanager_secret.bigquery_credentials.name } }
åè¿°ã®éãããã® Connection ã«ã¯ VPC è¨å®ã¯ä¸è¦ã§ãã
AWS Glue Job ç¨ã® IAM ãã¼ã«ä½æ
ä»åã®ã·ããªãªã§ AWS Glue Job ã«å¿ è¦ãªæ¨©éã¯ä»¥ä¸ã® 2 ã¤ã§ãã
- BigQuery ã®ã¯ã¬ãã³ã·ã£ã«ãèªã¿åãããã®æ¨©é
- ECR ããã¤ã¡ã¼ã¸ã pull ããããã®æ¨©é
Google BigQuery Connector 㯠ECR ã®ã³ã³ããã¤ã¡ã¼ã¸ã¨ãã¦æä¾ããã¦ãããããECR ã«å¯¾ãã権éãå¿ è¦ã§ãããã¡ãã¯ããã¼ã¸ãããªã·ã¼ãå©ç¨ãã¾ãããªããAWS Glue Job ã§å¿ è¦ã¨ãããåºæ¬çãªæ¨©éï¼assetsãã¡ã¤ã«ã®ä¿åã»èªã¿è¾¼ã¿ããã°åºåãªã©ï¼ã«ã¤ãã¦ãããã¼ã¸ãããªã·ã¼ãç¨æããã¦ããããããã¡ããå©ç¨ãã¾ãã
variable "job_role_name" {} data "aws_iam_policy_document" "glue_assume_role_policy" { statement { actions = ["sts:AssumeRole"] principals { type = "Service" identifiers = ["glue.amazonaws.com"] } } } # BigQuery ã®ã¯ã¬ãã³ã·ã£ã«ãèªã¿åãããã®æ¨©é data "aws_iam_policy_document" "get_describe_secret" { statement { sid = "GetDescribeSecret" actions = [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds", ] resources = [aws_secretsmanager_secret.bigquery_credentials.arn] } } resource "aws_iam_role" "glue_job" { name = var.job_role_name description = "Allows Glue to call AWS services on your behalf." assume_role_policy = data.aws_iam_policy_document.glue_assume_role_policy.json } resource "aws_iam_role_policy" "get_describe_secret" { name = "get_describe_secret" role = aws_iam_role.glue_job.id policy = data.aws_iam_policy_document.get_describe_secret.json } # ECR ããã³ã³ããã¤ã¡ã¼ã¸ã pull ããããã®æ¨©é resource "aws_iam_role_policy_attachment" "read-ecr" { role = aws_iam_role.glue_job.name policy_arn = "arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly" } # Glue Job ã®åºæ¬çãªæ¨©éï¼assetsãã¡ã¤ã«ã®ä¿åã»èªã¿è¾¼ã¿ããã°åºåãªã©ï¼ resource "aws_iam_role_policy_attachment" "glue-service" { role = aws_iam_role.glue_job.name policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole" }
ãã¼ã¿ã½ã¼ã¹ã S3 ã DynamoDB ãªã©ã® AWS ãµã¼ãã¹ã«ããå ´åã¯ãããããèªã¿åãããã®æ¨©éãå¿ è¦ã§ããä»åã¯ãã¼ã¿ã½ã¼ã¹ã Aurora ãªã®ã§ä¸è¦ã§ãã
AWS Glue Job ä½æ
ããã¾ã§ã§ãããããããã¸ã§ãã®ä½æã§ããGoogle BigQuery Connector for AWS Glue ã® Usage ã«ã¯ BigQuery ã¸ã®æ¸ãè¾¼ã¿ã«ã¤ãã¦ï¼2022/12ç¾å¨ï¼ä»¥ä¸ã®è¨è¿°ãããã¾ãã
You need to upload credentials.json to your S3 bucket, and set the file path in Referenced files path.
åè¿°ãã¾ããããBigQuery ã¸ã® Write ã§ã¯ Read ã¨éã£ã¦ Secrets Manager ã使ã£ã¦ããã¾ãããå¥é S3 ã« JSON ãã¡ã¤ã«ãç½®ããªãã¨ãããªãã¨ããè¬ä»æ§ã§ãã
ã¾ãã
Private key * spark.hadoop.fs.gs.auth.service.account.email= [your-email-extracted-from-service_account_json_file] * spark.hadoop.fs.gs.auth.service.account.private.key.id= [your-private-key-id-extracted-from-service_account_json_file] * spark.hadoop.fs.gs.auth.service.account.private.key= [your-private-key-body-extracted-from-service_account_json_file] You can set these Spark configurations in one of following ways. * The param --conf of Glue job parameters * The job script using SparkConf
ã¨ããã®ã§ãããã¸ã§ããã©ã¡ã¼ã¿ã¯ Key-Value å½¢å¼ãªã®ã§ --conf
ãè¤æ°åã¯è¨å®ã§ãã¾ãã*4ããã®ãããå¿
ç¶çã«å¾è
ã® The job script using SparkConf
ãé¸æãããã¨ã«ãªãã¾ãã
Python ã¹ã¯ãªãããç´æ¥ç·¨éãããã¨ã«ãªãã®ã§ãããã¼ãããå®è£ ããã®ã§ã¯ãªã Glue Studio ã® Visual Editor ã§ä½æããã¸ã§ãã®ã¹ã¯ãªãããç·¨éããterraform ã«è¼ããæ¹éã§ããã¾ãã
Visual Editor ã§ã¸ã§ããä½æ
Glue Studio ã® [Jobs] ã¡ãã¥ã¼ã§ Visual with a source and target
ãé¸æãã[Source] ã«ãã¼ã¿ãã¼ã¹ã*5ã[Target] ã« Google BigQuery Connector 0.24.2 for AWS Glue 3.0
ãé¸æã㦠[Create] ãã¿ã³ãã¯ãªãã¯ãã¾ãã
ä»åã®ã·ããªãªã§ã¯ãã¼ã¿ã¹ãã¼ãã®å¤æã¯è¡ããªããããApplyMapping
ã®ã¢ã¯ã·ã§ã³ãã¼ãã¯åé¤ãã¾ãã
Source ãã¼ããé¸æãã[Data source properties - JDBC] ã¿ãã§ä½ææ¸ã¿ã® JDBC Connection ãé¸æããã¼ãã«åã ã¹ãã¼ãå.ãã¼ãã«å
ã®å½¢å¼ã§æå®ãã¾ãããªããããã§æå®ãããã¼ãã«åã¯ãã¨ã§ terraform ã«ããä¸æ¸ãããããããUI ä¸ã§ã¯ããã¼ã®ãã¼ãã«åã§åé¡ããã¾ããã
åæ§ã«ãTarget ãã¼ããé¸æãã[Data target properties] ã¿ãã§æ¸ãè¾¼ã¿æã«å¿
é ã®ããããã£*6ã§ãã parentObject
, temporaryGcsBucket
, table
ãè¨å®ãã¾ããããããã¨ã§æ¸ãæããããããã¼ã®å¤ã§åé¡ããã¾ããã
æå¾ã«ã[Job details] ã¿ãã§ã¸ã§ãã«é©å½ãªååãã¤ãã¦ãIAM ãã¼ã«ã«å ã»ã©ä½æããã¸ã§ããã¼ã«ãé¸æãã[Save]ãã¿ã³ãã¯ãªãã¯ãã¾ããã¸ã§ãåã¯ãªã½ã¼ã¹ ID ã«ãªããããå¾ããå¤æ´ã¯ã§ãã¾ããã*7
ç»é¢ã®éãã«è¨å®ããçµæãåºåãããã¹ã¯ãªããã¯ä»¥ä¸ã®ã¨ããã¨ãªãã¾ãã
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame def directJDBCSource( glueContext, connectionName, connectionType, database, table, redshiftTmpDir, transformation_ctx, ) -> DynamicFrame: connection_options = { "useConnectionProperties": "true", "dbtable": table, "connectionName": connectionName, } if redshiftTmpDir: connection_options["redshiftTmpDir"] = redshiftTmpDir return glueContext.create_dynamic_frame.from_options( connection_type=connectionType, connection_options=connection_options, transformation_ctx=transformation_ctx, ) args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node PostgreSQL table PostgreSQLtable_node1 = directJDBCSource( # <= (注) ããã®ãã¼ãåã¯æ¯åç°ãªãã¾ã glueContext, connectionName="database-connection", connectionType="postgresql", database="database_name", table="schema.table_name", redshiftTmpDir="", transformation_ctx="PostgreSQLtable_node1", ) # Script generated for node Google BigQuery Connector 0.24.2 for AWS Glue 3.0 GoogleBigQueryConnector0242forAWSGlue30_node3 = ( # <= (注) ããã®ãã¼ãåã¯æ¯åç°ãªãã¾ã glueContext.write_dynamic_frame.from_options( frame=PostgreSQLtable_node1, connection_type="marketplace.spark", connection_options={ "parentProject": "bigquery-project-name", "temporaryGcsBucket": "temporary-gcs-bucket-name", "table": "project.dataset.table_name", "connectionName": "bigquery-connection", }, transformation_ctx="GoogleBigQueryConnector0242forAWSGlue30_node3", ) ) job.commit()
Python ã¹ã¯ãªããã®ç·¨é
ãã®ã¹ã¯ãªããã¯ã³ãã¼ãã¦ãã¼ã«ã«ãã¡ã¤ã«ã¨ã㦠terraform ã®ãã£ã¬ã¯ããªã«ä¿åãã¾ãããã ãããã®ã¾ã¾ã 㨠BigQuery ã«æ¥ç¶ããããã®ã¯ã¬ãã³ã·ã£ã«ãè¦ã¤ãããã¨ã©ã¼ã«ãªãããã以ä¸ã®ä¿®æ£ãé©ç¨ãã¾ãã
--- script.orig.py 2022-12-06 23:39:44.000000000 +0900 +++ script.writable.py 2022-12-06 23:42:28.000000000 +0900 @@ -2,6 +2,7 @@ from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext +from pyspark.conf import SparkConf from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame @@ -32,9 +33,13 @@ transformation_ctx=transformation_ctx, ) +conf = SparkConf() +conf.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") +conf.set("spark.hadoop.fs.gs.auth.service.account.enable", "true") +conf.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "credentials.json") args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -sc = SparkContext() +sc = SparkContext.getOrCreate(conf=conf) glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext)
spark.hadoop.google.cloud.auth.service.account.json.keyfile
ã«æå®ãã credentials.json
ã¯å¾è¿°ã® S3 ã«ã¢ãããã¼ãããã¯ã¬ãã³ã·ã£ã«ãã¡ã¤ã«ã® basename ã§ããAWS Glue ã® --extra-files
ãªãã·ã§ã³ã§ S3 ã®ãã¹ãæå®ãããã¨ã§ãã¸ã§ããã¼ãã®ã¯ã¼ãã³ã°ãã£ã¬ã¯ããªã«èªåçã«å±éããã¾ãã
ããã«ãã¼ãã«åãªã©ã terraform ã® templatefile
é¢æ°ã§æ¸ãæãå¯è½ãªããã«ãã¬ã¼ã¹ãã«ãã«ç½®ãæãã¾ãããã(ãã¬ã¼ã¹ãã«ãã¸ã®ç½®ãæã)ã¯ä»åã®ã·ããªãªã§å¿
é ã§ã¯ããã¾ããããå®éã«éç¨ããéã«ã¯ãããã£ãå®è£
ã«ãªãã¨æããã¾ãã
--- script.writable.py 2022-12-06 23:42:28.000000000 +0900 +++ script.template.py 2022-12-06 23:41:45.000000000 +0900 @@ -36,7 +36,7 @@ conf = SparkConf() conf.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") conf.set("spark.hadoop.fs.gs.auth.service.account.enable", "true") -conf.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "credentials.json") +conf.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "${credentials_filename}") args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext.getOrCreate(conf=conf) @@ -48,10 +48,10 @@ # Script generated for node PostgreSQL table PostgreSQLtable_node1 = directJDBCSource( glueContext, - connectionName="database-connection", - connectionType="postgresql", - database="database_name", - table="schema.table_name", + connectionName="${database_connection_name}", + connectionType="${database_connection_type}", + database="${database_name}", + table="${database_table_name}", redshiftTmpDir="", transformation_ctx="PostgreSQLtable_node1", ) @@ -62,10 +62,10 @@ frame=PostgreSQLtable_node1, connection_type="marketplace.spark", connection_options={ - "parentProject": "bigquery-project-name", - "temporaryGcsBucket": "temporary-gcs-bucket-name", - "table": "project.dataset.table_name", - "connectionName": "bigquery-connection", + "parentProject": "${bigquery_project_id}", + "temporaryGcsBucket": "${temporary_gcs_bucket_name}", + "table": "${bigquery_table_name}", + "connectionName": "${bigquery_connection_name}", }, transformation_ctx="GoogleBigQueryConnector0242forAWSGlue30_node3", )
æçµç㪠Python ã³ã¼ããã³ãã¬ã¼ãã¯ãã¡ã
- script.template.py
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from pyspark.conf import SparkConf from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame def directJDBCSource( glueContext, connectionName, connectionType, database, table, redshiftTmpDir, transformation_ctx, ) -> DynamicFrame: connection_options = { "useConnectionProperties": "true", "dbtable": table, "connectionName": connectionName, } if redshiftTmpDir: connection_options["redshiftTmpDir"] = redshiftTmpDir return glueContext.create_dynamic_frame.from_options( connection_type=connectionType, connection_options=connection_options, transformation_ctx=transformation_ctx, ) conf = SparkConf() conf.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") conf.set("spark.hadoop.fs.gs.auth.service.account.enable", "true") conf.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "${credentials_filename}") args = getResolvedOptions(sys.argv, ["JOB_NAME"]) sc = SparkContext.getOrCreate(conf=conf) glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node PostgreSQL table PostgreSQLtable_node1 = directJDBCSource( glueContext, connectionName="${database_connection_name}", connectionType="${database_connection_type}", database="${database_name}", table="${database_table_name}", redshiftTmpDir="", transformation_ctx="PostgreSQLtable_node1", ) # Script generated for node Google BigQuery Connector 0.24.2 for AWS Glue 3.0 GoogleBigQueryConnector0242forAWSGlue30_node3 = ( glueContext.write_dynamic_frame.from_options( frame=PostgreSQLtable_node1, connection_type="marketplace.spark", connection_options={ "parentProject": "${bigquery_project_id}", "temporaryGcsBucket": "${temporary_gcs_bucket_name}", "table": "${bigquery_table_name}", "connectionName": "${bigquery_connection_name}", }, transformation_ctx="GoogleBigQueryConnector0242forAWSGlue30_node3", ) ) job.commit()
Python ã¹ã¯ãªããã S3 ã«ä¿å
ãã®ãã³ãã¬ã¼ããã¡ã¤ã«ã«å¤ãåãè¾¼ã¿ãS3 ãã±ããã«é
ç½®ãã¾ããã¸ã§ãã«è¨å®ãã IAM ãã¼ã«ãèªã¿åããå ´æã«ä¿åããå¿
è¦ãããã®ã§ãããä»å㯠Glue Studio ã§ã¸ã§ããä½æããéã«èªåçã«ä½æããã aws-glue-assets-<account>-<region>
ã¨ãã S3 ãã±ãããå©ç¨ãã¾ãã
ãã®ãã±ããã®ãªãã¸ã§ã¯ãã¯ããã¼ã«ã«ã¢ã¿ãããã¦ããããã¼ã¸ãããªã·ã¼ã® AWSGlueServiceRole
ã§åç
§å¯è½ãªããã追å ã®ããªã·ã¼è¨å®ã¯ä¸è¦ã§ããGlue Studio ã§ä½æããå ´åã®ããã©ã«ãã®ã¹ã¯ãªããç½®ãå ´ã§ãããã¾ãã
variable "job_name" {} variable "source_table_name" {} variable "bigquery_project_id" {} variable "temporary_gcs_bucket_name" {} variable "destination_table_name" {} variable "credentials_filename" { default = "credentials.json" } data "aws_region" "current" {} data "aws_caller_identity" "current" {} locals { glue_assets_s3_bucket = "aws-glue-assets-${data.aws_caller_identity.current.account_id}-${data.aws_region.current.name}" } data "aws_s3_bucket" "assets" { bucket = local.glue_assets_s3_bucket } resource "aws_s3_object" "script" { bucket = data.aws_s3_bucket.assets.bucket key = "scripts/${var.job_name}.py" # ãã³ãã¬ã¼ããã¡ã¤ã«ã¯ `script.template.py` ã¨ãããã¡ã¤ã«åã§ãã¼ã«ã«ä¿åããã¦ãããã®ã¨ãã content = templatefile("${path.module}/script.template.py", { database_connection_name = aws_glue_connection.database.name database_connection_type = local.database_engine database_name = var.database_name database_table_name = var.source_table_name bigquery_connection_name = aws_glue_connection.bigquery.name bigquery_project_id = var.bigquery_project_id temporary_gcs_bucket_name = var.temporary_gcs_bucket_name bigquery_table_name = var.destination_table_name credentials_filename = var.credentials_filename }) }
ãã¬ã¼ã¹ãã«ãã®ä¸èº«ã¯ terraform apply
æã«ç¢ºå®ãããã¨ã«æ³¨æãã¦ãã ãããç¾ç¶ãAWS Glue ã«å¤æ°ãæãããæ©è½ã¯ããã¾ããã
S3 ã« BigQuery ã®ã¯ã¬ãã³ã·ã£ã«ãæ ¼ç´
Glue Job 㧠BigQuery ã«æ¸ãè¾¼ã¿ããããã®ã¯ã¬ãã³ã·ã£ã«ãã¡ã¤ã«ã S3 ãªãã¸ã§ã¯ãã¨ãã¦ç¨æãã¾ããåè¿°ã®ã¨ãããAWS Glue ã® --extra-files
ãªãã·ã§ã³ã§ãã®ãã¡ã¤ã«ã® S3 ãã¹ãæå®ããã¸ã§ããã¼ãã®ã¯ã¼ãã³ã°ãã£ã¬ã¯ããªã«èªåçã«å±éãããããã«ãã¾ãã
ãã¡ãã¯ã»ãã¥ã¢ã§ãªããã°ãããªããããå¥é S3 ãã±ãããä½ææ¸ã¿ã§ããã¨ããã³ã³ãã³ãã®ä¸èº«ã¯ Secrets Manager ä½ææã«å©ç¨ããå¤æ°ããã®ã¾ã¾ä½¿ãã¾ãã
variable "credentials_s3_bucket_name" {} variable "credentials_s3_bucket_prefix" { default = "" } data "aws_s3_bucket" "credentials" { bucket = var.credentials_s3_bucket_name } resource "aws_s3_object" "bigquery_credentials" { bucket = data.aws_s3_bucket.credentials.bucket key = "${var.credentials_s3_bucket_prefix}${var.credentials_filename}" content = var.bigquery_credentials_json } data "aws_iam_policy_document" "read_credentials_json" { statement { sid = "ReadCredentialsJson" actions = ["s3:GetObject"] resources = [ "${data.aws_s3_bucket.credentials.arn}/${aws_s3_object.bigquery_credentials.id}" ] } } # ä¿åãããã¡ã¤ã«ã®èªã¿åã権éã IAM ãã¼ã«ã«ä»ä¸ resource "aws_iam_role_policy" "read_credentials_json" { name = "read_credentials_json" role = aws_iam_role.glue_job.id policy = data.aws_iam_policy_document.read_credentials_json.json }
Glue Job ã®ä½æ
æå¾ã«ãGlue Job ã® terraform ãå®è£
ãã¾ãããã ãããã®ã¸ã§ãã¯ãã§ã« Glue Studio ã§ä½ææ¸ã¿ã®ãããterraform import
㧠state ãåãè¾¼ãã ãã¨ãå·®åã terraform apply
ãã¾ãã
resource "aws_glue_job" "main" { name = var.job_name role_arn = aws_iam_role.glue_job.arn glue_version = "3.0" # å¿ é ï¼ãªã㨠0.9 ã«ãªãï¼ connections = [ aws_glue_connection.database.name, aws_glue_connection.bigquery.name, ] execution_class = "STANDARD" worker_type = "G.1X" max_retries = 0 # ããã©ã«ã㯠3 number_of_workers = 2 # ããã©ã«ã㯠10 command { name = "glueetl" script_location = "s3://${aws_s3_object.script.bucket}/${aws_s3_object.script.id}" } default_arguments = { "--extra-files" = "s3://${data.aws_s3_bucket.credentials.bucket}/${aws_s3_object.bigquery_credentials.id}" # 以éã®ãã©ã¡ã¼ã¿ã¯ Visual Editor ã§ä½æããéã®ããã©ã«ãå¤ "--TempDir" = "s3://${data.aws_s3_bucket.assets.bucket}/temporary/" "--enable-continuous-cloudwatch-log" = true "--enable-glue-datacatalog" = true "--enable-job-insights" = true "--enable-metrics" = true "--enable-spark-ui" = true "--job-bookmark-option" = "job-bookmark-enable" "--job-language" = "python" "--spark-event-logs-path" = "s3://${data.aws_s3_bucket.assets.bucket}/sparkHistoryLogs/" } }
ãã¤ã³ã㯠--extra-files
ã§ã¯ã¬ãã³ã·ã£ã«ãã¡ã¤ã«ãæå®ãã¦ãããã¨ã§ãããã®ä»ã®ãã©ã¡ã¼ã¿ã¯ Visual Editor ã§ä½æããã¨ãã®ã¾ã¾ã«ãã¦ãã¾ãã
ã¸ã§ãã®å®è¡
ä½æãã Glue Job ã Management Console 㧠[Run] ããã¨ã¸ã§ããå®è¡ããã¾ããã¸ã§ãã®éå§ã¯é·ãã¨ãã 㨠1 åããããããã¾ãã ã¸ã§ãã®ã¹ãã¼ã¿ã¹ã Succeeded ã«ãªããBigQuery ã®ã³ã³ã½ã¼ã«ã§ãã¼ãã«ãä½æããã¦ããã°æåã§ãã
以ä¸ã«ã¤ãã¦ãæéããªãã£ãã®ã§ç¢ºèªã§ãã¦ãã¾ããã
- BigQuery ã®ãã¼ã¿ã¯æ´ãæ¿ããªã®ãã追å æ¸ãè¾¼ã¿ãªã®ãããã¼ã«ãããã¼ã¸ãå¯è½ãªã®ã
- ã¯ã¤ã«ãã«ã¼ããã¼ãã«ã®ä½æã¯ã©ããããï¼
datetime.strftime
ã§ã§ãããï¼ - ETL ã®
T
ã®é¨åã®å®è£ ã¯ã©ããªãã®ã - Spark UI ã®è¡¨ç¤ºãªã©
ã¡ãªã¿ã« Crawler ã§ä½æãã Data Catalog ããã¼ã¿ã½ã¼ã¹ã¨ãã ETL ã¸ã§ãã¯æ®éã«æåãã¾ããã
ã½ã¼ã¹ã³ã¼ãè©ä¾¡æã«çºçããã¨ã©ã¼ãã¡
BigQuery Connection ã®ãããã¸ã§ãã³ã°ã¨ã©ã¼
ECR ã® GetAuthorizationToken ã timeout ããããã㯠ECR ã«å¯¾ããã¢ã¦ããã¦ã³ãã許å¯ããã¦ããªãã£ããããã»ãã¥ãªãã£ã°ã«ã¼ãã« 0.0.0.0/0
ã®ã¢ã¦ããã¦ã³ãã追å ãããã¨ã§è§£æ¶ã
2022-12-06 03:10:54,541 - __main__ - INFO - Glue ETL Marketplace - Requesting ECR authorization token for registryIds=709825985650 and region_name=us-east-1. Traceback (most recent call last): File "/home/spark/.local/lib/python3.7/site-packages/urllib3/connection.py", line 160, in _new_conn (self._dns_host, self.port), self.timeout, **extra_kw File "/home/spark/.local/lib/python3.7/site-packages/urllib3/util/connection.py", line 84, in create_connection raise err File "/home/spark/.local/lib/python3.7/site-packages/urllib3/util/connection.py", line 74, in create_connection sock.connect(sa) socket.timeout: timed out During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/spark/.local/lib/python3.7/site-packages/botocore/httpsession.py", line 353, in send
ã°ã°ã£ã¦ã¿ãã以ä¸ã®ã¹ã¬ãããåºã¦ãã¦ãè¦å½éãã§æ°æéããã£ãã
https://stackoverflow.com/questions/69879490/etl-connector-not-loading-in-aws
Database Connection ã®ãããã¸ã§ãã³ã°ã§ãã¨ã©ã¼
以ä¸ã®ã¨ã©ã¼èªä½ã¯ Database Connection ã®ã¯ã¬ãã³ã·ã£ã«ã Secrets Manager ãã Username/Password ã«å¤æ´ããã解æ¶ãããã®ã ããæ ¹æ¬çãªåå ã¯ä¸æãsecretsmanager API ã«é¢ä¿ãã¦ããã ãã©ãConnection ã®ãããã¯ã¼ã¯ä»æ§ãè¤éã§çºçæ¡ä»¶ã®åãåãã大å¤ããã¦è¿½ã£ã¦ã¾ããã
22/12/06 06:51:28 ERROR ProcessLauncher: Error from Python:Traceback (most recent call last): File "/tmp/WriteToBigQuery.py", line 56, in <module> transformation_ctx="PostgreSQLtable_node", File "/tmp/WriteToBigQuery.py", line 33, in directJDBCSource transformation_ctx=transformation_ctx, File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 770, in from_options format_options, transformation_ctx, push_down_predicate, **kwargs) File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 232, in create_dynamic_frame_from_options source = self.getSource(connection_type, format, transformation_ctx, push_down_predicate, **connection_options) File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 105, in getSource makeOptions(self._sc, options), transformation_ctx, push_down_predicate) File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco return f(*a, **kw) File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o97.getSource. : java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:349) at scala.None$.get(Option.scala:347) at com.amazonaws.services.glue.util.DataCatalogWrapper.$anonfun$getJDBCConf$1(DataCatalogWrapper.scala:218) at scala.util.Try$.apply(Try.scala:209) at com.amazonaws.services.glue.util.DataCatalogWrapper.getJDBCConf(DataCatalogWrapper.scala:209) at com.amazonaws.services.glue.GlueContext.applyConnectionProperties(GlueContext.scala:1000) at com.amazonaws.services.glue.GlueContext.getSourceInternal(GlueContext.scala:913) at com.amazonaws.services.glue.GlueContext.getSource(GlueContext.scala:776) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750)
ãããã«
We are hiring
ã¨ã ã¹ãªã¼ã§ã¯ãã¼ã¿æ´»ç¨ã§æ¥æ¬ã®å»çã«è²¢ç®ãããã¡ã³ãã¼ãåéä¸ã§ãï¼ ã«ã¸ã¥ã¢ã«é¢è«ãéæåéãã¦ãã¾ãã®ã§ã詳ããã¯ä»¥ä¸ãã覧ãã ããï¼
*1:spark-bigquery-connector 0.24.2
*2:ãã ãããã®å ´å㯠IAM ããªã·ã¼ã®è¿½å ãå¿ è¦ã§ã
*3:ããã㧠20 æéããã溶ãã
*4:試ãã«è¤æ°æå®ãã¦ã¿ããã¨ã©ã¼ã«ãªãã¾ãã
*5:ç»é¢ã§ã¯PostgreSQL
*6:https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.24.2#properties
*7:terraform ã®å ´åãåä½æã«ãªãã¾ã