SREæå±ã® @siroken3 ã§ããæè¿ã¯ãã£ã±ããã¼ããã¼ä¼ç¤¾æ§ã¨ã®ãã¼ã¿é£æºç°å¢æ§ç¯ã主ã«ãæã ãããã¯ã·ã§ã³ã®MySQLç°å¢ã¨åæåºç¤ã¨ã®é£æºã¤ã³ãã©ã®æ§ç¯ãå¤ãã§ãã
æ¬è¨äºã¯ãã¡ã«ã«ãªã«åºåãããéå»ãã¹ã¦ã®ååãBigQueryã¸åæããã«ããã£ã¦åãçµãã æã®ã話ã§ãã
èæ¯
å½ç¤¾ã§ã¯åæç®çãªã©ã§BigQueryã以åãã使ç¨ãã¦ããããããã¯ã·ã§ã³ã®MySQLããBigQueryã¸ãã¼ã¿ãåæãã¦åæã«æ´»ç¨ãã¦ãã¾ãããç¹ã«ååã表ããã¼ãã«ã¯éè¦ã§ãã
ããããå¾è¿°ãã課é¡ã«ããBigQueryã«ã¢ãããã¼ããããã¨ãã§ããªãã£ããããåæç¨ã®MySQLDBã®ã¹ã¬ã¼ãã¨BigQueryãä½µç¨ããããå¾ã¾ããã§ãããã¨ã¯ããä¸ä¾¿ãªã®ã§ä»¥åããBigQueryã®ã¿ã§ååãã¼ãã«ãåæ対象ã¨ãããè¦æãããã¾ããã
課é¡
ã¡ã«ã«ãªã§ã¯è²©å£²æ¸ã¿ååãç©çåé¤ãã¦ããªãããããµã¼ãã¹éå§ä»¥æ¥å ¨ã¦ã®ãã¼ã¿ãæ ¼ç´ããã¦ãã¾ãããããã¯ã·ã§ã³ç°å¢ã§ä½¿ç¨ãã¦ããMySQLã®ååãã¼ãã«ã¯InnoDBã®Barracudaãã©ã¼ãããã§æ°ç¾GByteãªã¼ãã¼ãã¬ã³ã¼ãæ°ã«ãã¦æ°å件ã§ãã
BigQueryã«ã¯ãã©ã¤ããªãã¼ãã¦ãã¼ã¯ãã¼ã«ããå¶ç´ããããããªããã¨ã«æ³¨æããå¿ è¦ãããã¾ãã
ããã«ãååã®èª¬æãæ´æ°ããã / ä¾¡æ ¼ãå¤æ´ãã/ çºéããã ãªã©ã®ã¤ãã³ãã«ããååãã¼ãã«ã¯æ´æ°ããã¾ããåææã«ã¯ãããã®å¤åãå«ãã¦BigQueryã®ã¬ã³ã¼ããåæ ãããå¿ è¦ãããã¾ãã
ããããBigQueryã¯æ´æ°ã«ã¤ãã¦å¶éããã*1ããã¼ãã«æ¯ã«1æ¥ã«1,000åã¾ã§ããæ´æ°ã§ãã¾ããã
ãããã£ã¦ãç´ æ´ã«BigQueryã«åæãããã¨ããã¨å ¨ã¬ã³ã¼ããä¸åº¦ã«åæããå¿ è¦ãããã¾ããembulk *2ã®ãããªETLãã¼ã«ä½¿ãåæã¨ãã¦ãæ°æ¥ãªã©å®ç¨çãªæéã§åæã§ããªãã®ã§å·¥å¤«ãå¿ è¦ã§ãã
æ¡ç¨ããæ¹æ³
Shiozaki ããã«ããBigQueryã¸ã®å·®ååæã®ã¢ã¤ãã¢*3ãè¦ã¤ãããããã³ãã«å®ç¾ãããã¨ãã§ãã¾ããã
åºæ¬çãªæµãã¯ä»¥ä¸ã®ã¨ããã§ãã
- åæããããæéã®ã¬ã³ã¼ããå®æçã«æ¤ç´¢
- ãã®çµæãBigQueryã®ä½æ¥ç¨ãã¼ãã«ã¸ä¸æ¸ã
- ä½æ¥ç¨ãã¼ãã«ã¨ã¢ãããã¼ãå ã®ãã¼ãã«ããã¼ã¸ããã¼ã¸ã«ã¯BigQueryã®åæé¢æ°*4ã使ç¨
å ·ä½çã«ã¯MySQLããBigQueryã¸ã®åæç¨ETLãã¼ã«ã¯ embulk + embulk-input-mysql plugin *5 + embulk-output-bigquery plugin *6ã å®æå®è¡ããã³ã¨ã©ã¼æã®ãªãã©ã¤ããããããã«ã¯ã¼ã¯ããã¼ã¨ã³ã¸ã³ã® digdag *7ãæ¡ç¨ãã¾ããã
æ¢åååãã¼ãã«ã®åæ
å®æçã«å·®åãã¢ãããã¼ãããã®ã«å ç«ã¡ãç¾æç¹ãã¹ã¦ã®ååãã¼ãã«ã®ãã¼ã¿ãä¸æ¦BigQueryã¸ã¢ãããã¼ãããå¿ è¦ãããã¾ãã
対象ã¨ãªããã¼ã¿ãæ°ç¾GBããã¾ãã®ã§æ°æ¥éããããåãã¦å¤±æããªèª¿åã§ã¯Try&Errorãã¤ããããã®ã§ä¸åº¦ã«ã¢ãããã¼ãããè¤æ°åã«åãã¦ã¢ãããã¼ããããã¨ã«ãã¾ããã
embulkã«ã¯incremental:
æå®ãããç°¡åã«å
¥åãåå²ãã¦è¤æ°åã«åãã¦ã¢ãããã¼ãããå¦çããä»çµã¿ããããããã使ããã¨ãã¾ããããBigQueryã§æ±ãããååãã¼ãã«ã¯åä¸ã§ã¯ãªãè¤æ°ãã¼ãã«ã®JOINããçµæã§ãã JOINããã«ã¯ embulk ã® configurationã§query:
ã使ç¨ããå¿
è¦ããããããã¯æ®å¿µãªããincremental:
ã¨æä»ã ã£ã*8ã®ã§æ念ãã¾ããã
ããã§è¤æ°åã«åå²ããæ©è½ã¯digdag ã§æä¾ããã¦ãã for_each>
ã¿ã°ã§å®ç¾ãããã¨ã«ãã¾ãããååãã¼ã¿ã®æ´æ°æ¥æã表ãupdated
ãé©åãªæéã§åºåããembulkã¸ãã©ã¡ã¼ã¿ã¨ãã¦åºåãããæéã®éå§æ¥æãçµäºæ¥æã¨ãã¦æå®ããæ¹æ³ãåãã¾ãããåæããããã®embulk configurationã®ä¸é¨ãæç²ãã¾ãã
# items-bulk.liquid in: type: mysql host: {{ env.MYSQL_HOST }} port: 3306 user: {{ env.MYSQL_USER }} password: {{ env.MYSQL_PASS }} database: {{ env.MYSQL_DB }} query: | SELECT id, d.description, (ç¥) FROM items i JOIN item_description d ON i.id = d.item_id WHERE '{{ env.S }}' <= updated AND updated < '{{ env.E }}' out: type: bigquery mode: replace table: tmp_items (å¾ç¥)
ã¾ããdigdag ã® configuration ãä¸è¨ã«ç¤ºãã¾ããconfiguration for_each>
ã«ãã㦠periods
ã¨ããååã®ãªãã¸ã§ã¯ãã使ãã¾ããããµã¼ãã¹éå§å½åãªã©ãã¼ã¿éãå°ãªãã¨äºæ¸¬ãããæéã¯é·ãã«ããç´è¿ã®æ¥ä»ã¯æéãçãã«ãã¦ãã¾ããããã§ã大ããã®ã§ã1ã«ã¼ããæ°æéç¨åº¦ã§çµããããã«ãã¾ãembulkãOOMããªãããã«ããã«åå²ãè¡ãã¾ããã
ãã¹ã¦ã®åå²ãæ¥ä»ã§æå®ããã®ã¯å¯èªæ§ã«æ¬ ããã®ã§ãæ´ã«for_range>
ã® slice
ãªãã·ã§ã³ã«ãã50åå²ãä¸åº¦ã«åæããéã調æ´ãã¾ãããããããèªè
ã®çæ§ã®ã¦ã¼ã¹ã±ã¼ã¹ã§è©¦ãããéã«ã¯ digdag ã«ã¯åå²æ°ã®ããã©ã«ãã®ä¸é(1000)ããã
*9ã®ã§ã注æãã ããã
timezone: Asia/Tokyo +sync_all: for_each>: periods: # ãµã¼ãã¹éå§å½åã¯bulk upload対象æéé·ã(åå¹´) - from: '2013-06-24 00:00:00' to: '2014-01-01 00:00:00' - from: '2014-01-01 00:00:00' to: '2014-06-01 00:00:00' (ç¥) # æè¿åã¯çã - from: '2018-03-01 00:00:00' to: '2018-05-15 00:00:00' _do: +sync: for_range>: from: ${parseInt(moment(periods.from).format('X'))} to: ${parseInt(moment(periods.to).format('X'))} slice: 50 # æéä¸ã50åå²ãã¦embulkã§å¦ç _do: +sync: _export: s: ${moment.unix(range.from).format('YYYY-MM-DD HH:mm:ss')} e: ${moment.unix(range.to).format('YYYY-MM-DD HH:mm:ss')} _retry: 5 sh>: S="${s}" E="${e}" embulk run items-bulk.yml.liquid -r /var/tmp/items-bulk-resume.yml +merge: _retry: 3 # ä»ã®bq jobã«è¿·æããããªãããã« batchã¢ã¼ã sh>: bq query --batch --max_rows 1 --replace --allow_large_results --use_legacy_sql=false --destination_table OUR_BQ_DATASET.items "$(cat ./merge-items.sql)"
ä¸è¨ digdag ã¯ã¼ã¯ããã¼å®ç¾©ãã¡ã¤ã«ã®æçµè¡ãã¢ãããã¼ãããBigQueryä¸ã®åæç¨ä¸æãã¼ãã«ã¨å¯¾è±¡ã®ãã¼ãã«ã®ãã¼ã¸ç¨SQLå®è¡ã¨çµæã対象BigQueryãã¼ãã« items ã¸ä¸æ¸ãããã¿ã¹ã¯ã§ãã
sh>: bq query --batch --max_rows 1 --replace --allow_large_results --use_legacy_sql=false --destination_table OUR_BQ_DATASET.items "$(cat ./merge-items.sql)"
ãã¼ã¸ç¨SQL merge-items.sql
ã®å®è¡çµæã bq --destination_table
ã«ããitems ãã¼ãã«ã«ä¸æ¸ãããã¾ãã
SELECT * EXCEPT(rn) FROM ( SELECT *, row_number() over (PARTITION BY id ORDER BY updated DESC) AS rn FROM ( SELECT * FROM OUR_BQ_DATASET.tmp_items UNION ALL SELECT * FROM OUR_BQ_DATASET.items ) ) WHERE rn = 1
BigQueryã®åæé¢æ°ã® row_number()
㨠PARTITION BY
å¥ã使ç¨ãã¦ãã¾ãããããã®åæé¢æ°ã使ã£ã¦ã©ã®ããã«ãã¼ã¸ãè¡ãããã®ãã«ã¤ãã¦æ¦å¿µçãªå³ã§èª¬æãã¾ãã
- (1) BigQueryä¸ã«å¯¾è±¡ãã¼ãã«ã®items 㨠ä¸æãã¼ãã«ã® tmp_items ãããã¾ã
- (2) UNION ALL ã§å ¨è¡ã®åéåãæ§æãã¾ãã
- (3) row_number() over (PARTITION BY id ORDER BY updated DESC) ã«ãã£ã¦ itemsã®idæ¯ã«æ´æ°æ¥ã®éé ã§ã°ã«ã¼ãåããã¾ãã(å³ä¸ãæ¡å¤§å³ã§ã)
- (4) row_number = rd ã 1 ã®ãã®ã ããSELECTãã¾ãããã®éã«ä½æ¥ç¨ã«ã©ã rnãEXCEPT(rn)ã«ãã£ã¦é¤å»ãã¾ã
ãã®æ¹æ³ã«ããBigQueryã§éè¤ã¬ã³ã¼ããç»é²ãã¦ãã¾ãåé¡ãåé¿ãããã¨ãã§ããããã«ãªãã¾ãããããã§åãæéã対象ã«ããåæã¿ã¹ã¯ã2åå®è¡ãã¦ãã¾ã£ã¦ãéè¤ã¬ã³ã¼ããä½ããããã¨ãªãæå³ããã¬ã³ã¼ãã®ã¿åæãããã®ã§ãªã«ããªã楽ã«ãªãã¾ãã
å·®åãã¼ã¿ã®åæ
ãã§ã«ããã¾ã§ã®ä»çµã¿ã§å·®ååæããä»çµã¿ãã§ãããã£ã¦ããããã¨ã¯MySQLã®ã¯ã¨ãªæ¡ä»¶ã®éãã¨éç¨ä¸ã®ä»æãã§ãã
ã¯ã¨ãªæ¡ä»¶ã®éãã¯ãå®æå®è¡ããdigdag workflowã®ååã®ã¿ã¹ã¯å®è¡æé以éãã¹ã¦ãã«ãªãã¾ãã
ãã ããæ¬ç¨¿ã§èª¬æããMySQLãµã¼ãã¯ãããã¯ã·ã§ã³ãã¼ã¿ãã¼ã¹ã®ã¹ã¬ã¼ãã¨ãã¦éç¨ãã¦ãããã¬ããªã±ã¼ã·ã§ã³é 延ã®å¯è½æ§ãããã¾ããããã§ãããã¡ã¨ãã¦1æéãè¨ãã¦åæ対象ã®å·®åã®ç¯å²ãéå»ã«é¡ããã¦æéãéè¤ããã¦ãã¾ãã
ãã®ãããªéç¨ãå¯è½ãªã®ãåæé¢æ°ã«ãã£ã¦ææ°ã®ã¬ã³ã¼ãã®ã¿ããã¼ã¸ã§ããä»çµã¿ãããã°ãããã§ãã
ã¾ã digdag ã¯ç°¡åã« plugin ãä»è¾¼ããããã«ãªã£ã¦ããã æå®ããåæ°ãªãã©ã¤ãã¦ãã¨ã©ã¼ã«çµãã£ãå ´å㯠Slackã«éç¥ãã*10ãã¨ãã§ãã¾ãã
timezone: Asia/Tokyo schedule: minutes_interval>: 30 # 30 åãã¨ã«å®è¡ skip_on_overtime: true # ååã®ã¯ã¼ã¯ããã¼ã®å®è¡æéã30åãè¶ ãã¦å®è¡ä¸ã®ã¨ãã¯SKIPãã _export: plugin: # éç¥ãã©ã°ã¤ã³ repositories: - https://jitpack.io dependencies: - com.github.szyn:digdag-slack:0.1.3 workflow_name: upload-to-bq-items !include : credentials/slack_webhook.dig +sync: +load: _export: # æçµ(SKIPãã¦ããªã)å®è¡æ¥æãã1æéåã«é¡ã£ã¦åæ対象ã«ãã S: ${moment(last_executed_session_time).add(-1, 'hours').format('YYYY-MM-DD HH:mm:ss')} _retry: 5 sh>: embulk run ./items.yml.liquid -r /var/tmp/items-resume.yml +merge: _retry: 3 sh>: bq query --batch --max_rows 1 --replace --allow_large_results --use_legacy_sql=false --destination_table OUR_BQ_DATASET.items "$(cat ./merge-items.sql)" _error: slack>: error.yml
ç¾å¨ã®ã¨ãããããã30å以å ã«ã¯ååãã¼ãã«ã®åæãã§ãã¦ããç¶æ³ã§ããã¨ã¯ããBigQueryã¸ã®åæã失æãããã¨ããã£ãããåºåæ°ãå¢ãã¦30åã§åã¾ããªãå¯è½æ§ãããã®ã§èª¿æ´ããå¿ è¦ãããããã§ãã
ä»å¾
ä»åBigQueryã¸ã®åæã®ä»çµã¿ãembulkã¨digdagã®çµã¿åããã§å®ç¾ãã¾ããã社å ã§ãã¼ã¿åºç¤ãæ´ããåããããããããããã¨å¥ã®ETLãã¼ã«ãé¸æãããããããã¾ããã
ãããBigQueryã«å¯¾ãã¦å¤§éã®ãã¼ã¿ãã¢ãããã¼ãããããã«ã¯ä»åã®åæé¢æ°ã使ã£ããã¼ã¸ã¯æå¹ã§ãããBigQueryã¸ã®åæãæ§ç¯ããå ´åã«ã¯æ¬ ãããªãææ³ã«ãªãããã§ãã
ã¾ã¨ã/ææ
- 巨大ãªãã¼ãã«ãBigQueryã¸ã¢ãããã¼ããã¦ä½¿ããããã«ãªã£ã
- åæé¢æ°ã®å©ç¨ã«ãã£ã¦åæã”ã³ã±”ã¦ãã«ãå®å¿ãã¦èªåå®è¡ã§ããããã«ãªã£ã
- digdag ã¯æ £ããå¿ è¦ã ãããã©ã°ã¤ã³ã®ä»çµã¿ã¯ä¾¿å©ãå®æå®è¡ã¢ã¼ãã§ã®ãªãã©ã¤ã®ä»çµã¿ãåªç§ã ã¨æã
æå¾ã«ãªãã¾ãããã¡ã«ã«ãªã§ã¯ãã¼ã¿åæåºç¤ã®éçºãããæ¹ãåéä¸ã§ãããã£ã¨ããæ¹æ³ã§ä½ã£ã¦ããã¾ãããã¨ããçæ§ããå¿åãå¾ ã¡ãã¦ããã¾ãã
*1:BigQuery ãã¼ã¿æä½è¨èªã®ã¹ãã¼ãã¡ã³ã
*3:MySQLããBigQueryã®åæãå·®åæ´æ°ã«ããã4åæ©ããªã£ã話 (Takehiro Shiozakiãã)
speakerdeck.com
*4:https://cloud.google.com/bigquery/docs/reference/standard-sql/functions-and-operators#analytic-functions
*5:https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-mysql
*6:https://github.com/embulk/embulk-output-bigquery
*8:Ver. 0.8.33ã«ã¦ç¢ºèª
*9:ã¡ãªã¿ã«ãã®ä¸éã®åé¿çãããã¾ãhttps://github.com/treasure-data/digdag/issues/508