ãawsãFirehoseã®Dynamic Partitioningã試ã
Amazon Kinesis Data Firehoseã«ãã¤ãã«Dynamic Partitioningãå®è£ ããã¾ããï¼
Introducing Dynamic Partitioning in Amazon Kinesis Data Firehose
ã¨ããããã§æ©é試ãã¦ãããã¨æãã¾ãã
Firehoseã«ãããS3転éæã®åé¡ç¹
ä»åå®è£ ãããDynamic Partitioningã®è©±ã«å ¥ãåã«ãããããFirehoseãå©ç¨ããä¸ã§å°ã£ã¦ããä»æ§ãç°¡åã«ãããããã¾ãã
Firehoseã§ã¯Streamã«æºã¾ã£ããã¼ã¿ãS3ã«è»¢éããéãFirehoseãå¦çãè¡ã£ãæéã§èªåçã«ãã£ã¬ã¯ããªæ§é ãæ§æãããã®é
ä¸ã«ãã¡ã¤ã«ãåºåãã¦ãã¾ããã
ãã®ä»æ§ã ã¨ãS3ã«ä¿åãããã¼ã¿ãAthenaçã§åæããéãå®éã®ãã¼ã¿ï¼åã¬ã³ã¼ãå
ã®ã¿ã¤ã ã¹ã¿ã³ããªã©ï¼ã¨ãS3ä¸ã®ãã£ã¬ã¯ããªæ§é ã§ãããã¼ãã£ã·ã§ã³ã«å·®ç°ãçã¾ããããããããèæ
®ããã¯ã¨ãªãçºè¡ããããå¥éãã¼ãã£ã·ã§ã³å
ã®ãã¼ã¿ãæ´é ããå¦çãå¿
è¦ã«ãªã£ã¦ãã¾ããã
ã¾ããä½ããã®IDãªã©ãæé以å¤ã§ã®ãã¼ãã£ã·ã§ã³æ§æãæ¡ããã¨ãã§ããªãç¹ãåé¡ã§ããã
ãããããã®ä»æ§ããããããStreamã«Firehoseã使ããªãã¢ã¼ããã¯ãã£ãæ¡ç¨ããã±ã¼ã¹ããã£ãããã«æãã¾ãã
ãããä»åã®Dynamic Partitioningã®å°å
¥ã«ãããFirehoseã®Streamã«æµå
¥ããã¬ã³ã¼ãã®ãä»»æã®ãã¼ã¿ããã¼ãã£ã·ã§ã³ã®ãã¼ã¨ãã¦è¨å®ã§ããããã«ãªãã¾ããã
ããã«ãããå¾æ¥ã§ããã°å¥éãã¼ãã£ã·ã§ã³ã®ããã«è¡ã£ã¦ããæ´é å¦çãªã©ããFirehoseåä½ã§å®çµã§ããããã«ãªãã¾ãã
Dynamic Partitioning
ãã¼ãã£ã·ã§ã³ãã¼è¨å®ã®æ¹æ³
ç¾å¨ã®ã¨ããDynamic Partitioningã§ãã¼ãã£ã·ã§ã³ãã¼ãæå®ããæ¹æ³ã¨ãã¦ã2ã¤ã®æ¹æ³ãæä¾ããã¦ããããã§ãã
1 inline parse
jq ã使ç¨ãã¦ã¬ã³ã¼ããããã¼ãã£ã·ã§ã³ãã¼ãæ½åºãã¾ã
2 AWS Lambda
æ¢åã®ãã¼ã¿å¤æLambdaã®ä»çµã¿ãæµç¨ãã¾ã
ç°¡æãªè¦ä»¶ã¯inline parseã§ãããã対å¿ã§ãããã§ãã
ãã¼ãã£ã·ã§ã³ãã¼ã«è¤éãªè¦ä»¶ãå¿
è¦ã§ãã£ãããæ¢ã«ãã¼ã¿å¤æLambdaãå©ç¨ãã¦ããå ´åãã¾ãinline parseã®å¦çã«ããæéè¨ç®ï¼è©³ç´°ã¯å¾è¿°ï¼ãå«ãå ´åãªã©ã¯ãLambdaãç¨ãã¦ãã¼ãã£ã·ã§ã³ãã¼ãè¨å®ããæ¹æ³ãè¯ãããã§ãã
ã§ã¯ããããã®ãã¼ãã£ã·ã§ã³ãã¼è¨å®ãé ã«è©¦ãã¦ã¿ã¾ãã
æé
ä»åã¯ä»¥ä¸ã®ãã¼ã¿æ§é ã®ãµã³ãã«ãã¼ã¿ãç¨ãã¾ãï¼Dynamic Partitioningã®å
¬å¼DocãFirehoseã®ä½æç»é¢ã§æ示ããã¦ãããã®ï¼ã
å人çã«æ¥æãã¼ã¿ã¯ISO8601ã®å½¢å¼ã§æ±ããã¨ãå¤ãã®ã§ããã®ãµã³ãã«ãã¼ã¿ãæ«å°¾ã«ä»ä¸ããçã§ãã
{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, "region": "pdx", "createdAt": "2021-09-01T13:24:10Z" }
ï¼ãã¼åã«çµ±ä¸æããªãã¡ãã¯ãã§ããã容赦ãã ããð«ï¼
æ§æã¯IoT Coreãç¨ãã以ä¸ã®æ§æã¨ãã¾ãã
S3ãã±ããä½æ
ã¾ãä»»æã®ååã§ãã±ãããç¨æãã¾ãã
以ä¸ãä½ææé ã§ç¹ã«è¨åã®ãªãé¨åã¯ããã©ã«ãå¤ã使ç¨ãã¾ãã
Name: firehose-dynamic-partition
Firehoseä½æ
次ã«Firehoseãä½æãã¦ããã¾ãã
Kinesisã®ã³ã³ã½ã¼ã«ãéã > é ä¿¡ã¹ããªã¼ã ãä½æ
- Source:
Direct PUT
- Destination:
S3
- Delivery stream name: ï¼ä»»æï¼
- S3 bucket:
firehose-dynamic-partition
ï¼å ã»ã©ä½æããS3ãã±ããï¼ - Dynamic partitioning:
Enabled
- New line delimiter:
Enabled
- Inline parsing for JSON:
Enabled
- Dynamic partitioning keys ã以ä¸ã®ç»åã®ããã«è¨å®ãã¾ãã
createdAt
ã®çç¥ããã¦ããé¨åã¯.createdAt | fromdateiso8601 | strftime("%Y-%m-%d %H:%M:%S")
ã§ã
- å
¥åå¾ã
Add dynamic partitioning key
ãã¿ã³ãæ¼ãã¨ãS3 bucket prefix
ã®æ¬ã«è¨å®ãããã¼ãã£ã·ã§ã³ãã¼ã®è¨å®ãåºåããã¾ãã- S3 bucket prefix:
!{partitionKeyFromQuery:customer_id}/!{partitionKeyFromQuery:device}/!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:createdAt}/
- S3 bucket prefix:
- New line delimiter:
- Buffer interval:
60
â»ä½æç»é¢ã®å
¬å¼ã® jq
ã®ä¾ã 㨠strftime
ã¯ã·ã³ã°ã«ã¯ã©ã¼ãã¼ã·ã§ã³ã§è¡¨è¨ããã¦ãã¾ããããå®è¡æjqã®ãã¼ã¹ã¨ã©ã¼ã«ãªã£ãããããã«ã¯ã©ã¼ãã¼ã·ã§ã³ã«ãã¦ãã¾ãã
â»createdAt
ã®ãã¼ãã£ã·ã§ã³ã«ç¹ã«æå³ã¯ããã¾ãããISO8601å½¢å¼ã®ä¾ç¤ºç®çã§ãã
IoT Core ruleä½æ
IoT Coreã®ã³ã³ã½ã¼ã«ãéã > ACT > ã«ã¼ã« > ä½æ
- Name:
firehose_dynamic_partition
ï¼ä»»æï¼ - ã«ã¼ã«ã¯ã¨ãªã¹ãã¼ãã¡ã³ãï¼
SELECT * FROM 'iot/firehose'
- ã¢ã¯ã·ã§ã³ã®è¿½å
- Amazon Kinesis Firehose ã¹ããªã¼ã ã«ã¡ãã»ã¼ã¸ãéä¿¡ãã
- ã¹ããªã¼ã åï¼ ï¼å ã»ã©ä½æããFirehose streamï¼
- Separatorï¼ åºåãæåãªã
- ãã¼ã«ã®ä½æï¼ä»»æã®ååãã¤ãã
- ã¢ã¯ã·ã§ã³ã®è¿½å
- Amazon Kinesis Firehose ã¹ããªã¼ã ã«ã¡ãã»ã¼ã¸ãéä¿¡ãã
- ã«ã¼ã«ã®ä½æ
åä½ç¢ºèª
以ä¸ã®ä½æ¥ã§inline parseã®æ§æã¯ã§ããã®ã§ãåä½ç¢ºèªããã¦ããã¾ãã
IoT Core > ãã¹ã > MQTTãã¹ãã¯ã©ã¤ã¢ã³ã
ãæ¼ä¸ãããã¹ãã¯ã©ã¤ã¢ã³ããã¼ã¸ãéããããããã¯ã«å ¬éãããã¿ããé¸æããã
ãããã¯åï¼ iot/firehost
ä¸è¨ã®ã¡ãã»ã¼ã¸ãã¤ãã¼ããå ¥åããçºè¡ãã¿ã³ãæ¼ãã¾ãã
{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, "region": "pdx", "createdAt": "2021-09-01T13:24:10Z" }
試ãã«ä½åº¦ãçºè¡ãã¾ãã
Firehoseã®Intervalè¨å®ã60ç§ã«ããã®ã§ãç´1åå¾ã«S3å ã«æå®ãããã¼ãã£ã·ã§ã³ï¼ãã£ã¬ã¯ããªæ§é ï¼ã§ãã¡ã¤ã«ãåºåããã¦ãããã¨ã確èªã§ãã¾ãã
Lambdaçã®åçãã¼ãã£ã·ã§ã³
ç¶ãã¦ãLambdaã§ã®ãã¼ãã£ã·ã§ã³ãã¼è¨å®ã試ãã¦ããã¾ãã
å
ç¨ä½æããFirehoseãç·¨éãã¾ãã
Firehose > ä½æããStream > Configuration > Transform and convert records > Edit
- Data transformation:
Enabled
- Create function
- General Kinesis Data Firehose Processing
- é¢æ°å:
firehose-dynamic-partition-lambda
ï¼ä»»æï¼ - åºæ¬ç㪠Lambda ã¢ã¯ã»ã¹æ¨©éã§æ°ãããã¼ã«ãä½æ ãé¸æãã¦é¢æ°ã®ä½æ
- Create function
Lambda Functionãä½æããããã以ä¸ã®ã³ã¼ããè²¼ãä»ãã¦ãããã¤ãã¾ãã
console.log('Loading function'); exports.handler = async (event, context) => { /* åã¬ã³ã¼ãã«partitionKeysã®metadataãè¨å®ãã */ const records = event.records.map((record) => { const payload = JSON.parse( Buffer.from(record.data, 'base64').toString('utf-8') ); console.log('Decoded payload:', payload); const d = new Date(payload.event_timestamp * 1000); // epoch time(sec)ãmsã§initialize const partitionKeys = { customer_id: payload.customer_id, device: payload.type.device, year: d.getUTCFullYear(), month: ('00' + (d.getUTCMonth() + 1)).slice(-2), // getUTCMonthã¯0-11ãè¿ããã day: ('00' + d.getUTCDate()).slice(-2), createdAt: payload.createdAt.replace('T', ' ').replace('Z', ''), // 2021-09-01T13:24:10Z -> 2021-09-01 13:24:10 }; return { recordId: record.recordId, data: record.data, result: 'Ok', metadata: { partitionKeys }, }; }); console.log(`Processing completed. Successful records ${records.length}.`); return { records }; };
ãã¼ãã£ã·ã§ã³ãã¼ã®è¨å®ãinline parseã¨åæ§ã«ãªãããã«å¤å°å¦çãå®è£
ãã¦ãã¾ãããåºæ¬çã«ã¯ãStreamã®Eventã¨ãã¦æ¸¡ã£ã¦ãã records
ã«å¯¾ãã¦ã metadata
ã¨ã㦠partitionKeys
ã®å¤ã追è¨ãã records
ãä½æããreturnãã¦ããããã¨ã§Lambdaã§ã®ãã¼ãã£ã·ã§ã³ãã¼è¨å®ã¯å®äºã§ãã
Firehoseã®ç·¨éç»é¢ã«æ»ããBrowseããå ç¨ä½æããLambdaãé¸æãã¾ãã
ãã®ä»ã®è¨å®ã¯ããã©ã«ãã®ã¾ã¾ Save changesãæ¼ä¸ãã¾ãã
Lambdaã®å¤æå¦çã§partitionKeyãè¨å®ããããã«ãããããjqã使ç¨ããinline parseã®è¨å®ããªãã«ãã¾ãã
Firehose > ä½æããStream > Configuration > Destination settings > Edit
Inline parsing for JSON: Disabled
â»Dynamic Partitioningãæå¹ã«ããå ´åããã¼ã¿å¤æç¨ã®Lambdaãè¨å®ãã¦ããªãã¨ã以ä¸ã®ããã«inline parseãç¡å¹åãããã¨ãã§ãã¾ããã
S3 bucket prefixã以ä¸ã®ããã«ãLambdaã§è¨å®ããPartition Keyã使ç¨ããããã«ä¿®æ£ãã¾ãã
!{partitionKeyFromLambda:customer_id}/!{partitionKeyFromLambda:device}/!{partitionKeyFromLambda:year}/!{partitionKeyFromLambda:month}/!{partitionKeyFromLambda:day}/!{partitionKeyFromLambda:createdAt}/
ä¿®æ£å¾ãSave changesãæ¼ä¸ãã¾ãã
åä½ç¢ºèªã§å®æ½ããæé ã¨åæ§ã«ã以ä¸ã®ã¡ãã»ã¼ã¸ãã¤ãã¼ããIoT Coreã®ãã¹ãã¯ã©ã¤ã¢ã³ãããéä¿¡ããå¦çãå®è¡ãããã¾ã§ã®æéå¾
æ©ãã¾ãã
ï¼createdAt
ã®å¤ã®ã¿æåã®ãã¼ã¿ã®1æéå¾ã«ãã¦ãã¾ãï¼
{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, "region": "pdx", "createdAt": "2021-09-01T14:24:10Z" }
ç¡äºæå®ãããã¼ãã£ã·ã§ã³ã«ãã¡ã¤ã«ãåºåããã¾ããï¼
æé
Dynamic Partitioningã®å©ç¨ã«ã¯è¿½å ã®æéãããããã注æãå¿
è¦ã§ãã
詳細ã¯æ«å°¾ã®å
¬å¼pricingãåèã«ãã¦é ãã¨ãã¦ã主ã«ä»¥ä¸3ã¤ã®åè¨ãæéã«ãªãã¾ãã
å ¬å¼ã®è©¦ç®çµæãåç §ããã«ããã¾ã大ããªé¡ã§ã¯ãªããã¨æãã¾ãããå©ç¨æ¹æ³ã«ãã£ã¦ä¾¡æ ¼ãå¤ãã£ã¦ãã¾ãã®ã§ãèªèº«ã®ã¦ã¼ã¹ã±ã¼ã¹ã§è©¦ç®ãã¦ã¿ãã®ããããã¨æãã¾ãã
ãã¼ãã£ã·ã§ã³ãã¼è¨å®ã®ããã«å¥éå©ç¨ãã¦ããæéã¨æ¯ã¹ã¦ãé«é¡ã«ãªããã¨ã¯ãã¾ããªãã®ããªã¨æãã¾ããã
ã¢ã¼ããã¯ãã£ãã·ã³ãã«ã«ãªããä¸ç³äºé³¥ã§ããã
注æç¹
以ä¸ç®ã«ã¤ãã注æäºé
ã§ãã
詳ããã¯å
¬å¼Docããåç
§ãã ããã
- åçãã¼ãã£ã·ã§ã³ãæå¹åã§ããã®ã¯æ°è¦ã®Firehose streamã®ã¿
- åçãã¼ãã£ã·ã§ã³ã§ä½æãã¦ããã°ãå¾ãããã¼ãã£ã·ã§ã³ãã¼ãå¤æ´ãããã¨ã¯å¯è½
- åçãã¼ãã£ã·ã§ã³ã§ä½æããFirehose streamã®åçãã¼ãã£ã·ã§ã³ãç¡å¹åãããã¨ãã§ããªã
- åæã«å¦çå¯è½ãªãã¼ãã£ã·ã§ã³ã®æ大ã¯500件
- ãããè¶ ãããã¼ãã£ã·ã§ã³ãã¼ã®ãã¼ã¿ã¯ã¨ã©ã¼è¡ã
ææ
IoT Analyticsã§ã¯ä»åFirehoseã«è¿½å ãããDynamic Partitioningã«ä¼¼ããã¨ãã§ãã¦ãããããFirehoseã§ã¯ä½æ ã§ããªãã®ã â¦â¦ã¨é ãæ±ãã¦ããã®ã§ãããç¡äºæ©è½ããªãªã¼ã¹ããã¦ã¨ã¦ãå¬ããã§ãã
ãã2é±éãããFirehose + S3 + Athenaã®æ§æã§æ©ãã§ããããããªãªã¼ã¹ãã¼ã¸ãè¦ãã¨ãã¯è奮ã®ãã¾ã失ç¥ãããã«ãªãã¾ããããããèããã¨æåããç¨æãã¦ããã¦ãã¨ããæ°æã¡ãâ¦â¦ã
ä¸è¯ãè¯ããã¨ããã¨ãã¨ã¦ãããè¦ããçè«ã§ãããã
ã¨ããããä»å¾Firehoseãç¨ããæ§æãã·ã³ãã«ãã¤å©ç¨ãããããªãã®ã§ã¨ã¦ãè¯ãã£ãã§ãï¼