ããã«ã¡ã¯ã
å±±ã好ããªå±±æ¬ã§ãã
ç´
èã·ã¼ãºã³ã§ããã
ã¸ã§ã管çã½ããã¦ã§ã¢ Airflow ã®ããã¼ã¸ããµã¼ãã¹ã§ãããAmazon MWAA (Amazon Managed Workflows for Apache Airflow) ãæ¤è¨¼ãã¦ãã¾ãã
ç°å¢ä½æã¨å©ç¨æéã«ã¤ãã¾ãã¦ã¯ã以ä¸ã®è¨äºã§è§£èª¬ãã¦ãã¾ãã
åèã«ãåç
§ãã ããã
ã¾ããã¿ã¤ã ã¾ã¼ã³ãæ±äº¬ã«å¤æ´ããæ¹æ³ãããã°è¨äºãæ¸ãã¦ãã¾ãã
Amazon MWAA (Amazon Managed Workflows for Apache Airflow) ã®ã¿ã¤ã ã¾ã¼ã³ãæ±äº¬ã«å¤æ´ããã - ãµã¼ãã¼ã¯ã¼ã¯ã¹ã¨ã³ã¸ãã¢ããã°
æ¬è¨äºã§ã¯ãä¸ã§ä½æããç°å¢ãå©ç¨ããã ECS ã¿ã¹ã¯ãèµ·åããã¸ã§ããä½æãã¦åããããã¨ãæ¤è¨¼ãã¾ããã
Amazon MWAA (Amazon Managed Workflows for Apache Airflow) ç°å¢ã§ã¸ã§ããèµ·åãã
ã¾ã ã¸ã§ããï¼ã¤ãåããããã¨ããªãã£ããããã¾ãæ¥ä»æå»ãæ¨æºåºåããbashã³ãã³ãï¼dateï¼ãå®è¡ãã¦ã¿ã¾ããã
以ä¸ã®ãããªDAGãã¡ã¤ã«ã«ãªãã¾ããã
BashOperator ãå©ç¨ãã¦ãbash ã® date ã³ãã³ããå®è¡ãã¦ãã¾ãã
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta import pendulum local_tz = pendulum.timezone("Asia/Tokyo") default_args = { "owner": "tetsuya_yamamoto", #ææè "depends_on_past": False, #ååã失æã ã£ãå ´åã«å®è¡ããã "email": ["[email protected]"], #ã¡ã¼ã«ã¢ãã¬ã¹ "email_on_failure": False, #失æããã¨ãã«ã¡ã¼ã«éç¥ããã "email_on_retry": False, #ãªãã©ã¤çºçæã«ã¡ã¼ã«éç¥ããã "retries": 1, #ãªãã©ã¤åæ° "retry_delay": timedelta(minutes=5), #ãªãã©ã¤éé } #dag = DAG("tutorial", default_args=default_args, schedule_interval=timedelta(1)) dag = DAG( 'sample_dag37_46', # DAG å default_args=default_args, # ããã©ã«ãã®å¼æ° description='test_dag', # 説æ start_date=datetime(2022, 9, 14, hour=14, minute=4, second=0, microsecond=0, tzinfo=local_tz), #éå§æ¥æ end_date=datetime(2022, 9, 14, hour=14, minute=4, second=0, microsecond=0, tzinfo=local_tz), #çµäºæ¥æ schedule_interval=timedelta(minutes=3), # å®è¡éé tags=['example'] ) t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag) #date ã³ãã³ãå®è¡
ãstart_dateãããend_dateãã¨ãschedule_intervalãã®ä»æ§ï¼åããã«ãããã¤ã³ãï¼
ãstart_dateãããend_dateãã¨ãschedule_intervalãã®ä»æ§ãåããã«ããã®ã§ãã»ãã®å°ã解説ãã¾ãã
åããã«ãããã¤ã³ãã
- ã¸ã§ãã®ååå®è¡æå»ã¯ãstart_dateãã§ã¯ãªãããstart_dateãã«ãschedule_intervalãã足ããæéã«ãªãã
- åããæå¾ã®ã¸ã§ãå®è¡æå»ã¯ãend_dateãã§ã¯ãªããã end_dateãã«ãschedule_intervalãã足ããæéã«ãªãã
å®éã«ä¸ã®DAGã§ã¯ããstart_dateããã2022/09/14 14:04ï¼ï¼ï¼è¡ç®ï¼ãã«ãã¦ãã¾ãã
ã¾ãããschedule_intervalãã¯ãï¼åï¼ï¼ï¼è¡ç®ï¼ãã«ãã¦ãã¾ãã
DAGsã®ç»é¢ãã ãLAST RUNã ãè¦ãã¨ãã14:04ãã«ãªã£ã¦ãã¾ãã
ãããããRecent Tasksããã詳細ãè¦ã¦ã¿ãã¨ããStart Dateãã¯ã14:07ãã«ãªã£ã¦ãã¾ãã
ãã°ã確èªããã¨ãå®è¡ããdateã³ãã³ãã®çµæã ã14:07 ãã§ããã
å¾ã£ã¦ãä¾ãã° 2022å¹´9æ14æ¥ 15:00 ã«ä¸åº¦ã ãåãã¸ã§ããä½ãå ´åã以ä¸ã®ããã«ãã¾ãã
- ãstart_dateãã¯ã2022å¹´9æ14æ¥ 14:57
- ãend_dateããã2022å¹´9æ14æ¥ 14:57
- ãschedule_intervalãã¯ãï¼å
ã¾ãã¯
- ãstart_dateãã¯ã2022å¹´9æ14æ¥ 14:59
- ãend_dateããã2022å¹´9æ14æ¥ 14:59
- ãschedule_intervalãã¯ã1 å
ä¾ãã°ã2022å¹´9æ14æ¥ 15:00 ã«éå§ãã¦ãæ¯æ¥åãæéã«åãã¸ã§ããä½ãå ´åã¯ä»¥ä¸ã®ããã«ãªãã¾ãã
- ãstart_dateãã¯ã2022å¹´9æ13æ¥ 15:00
- ãend_dateãã¯æå®ä¸è¦ãçµäºæ¥ãããå ´åã¯ãçµäºæ¥ã®1æ¥åãæå®ããã
- ãschedule_intervalãã¯ããæ¯æ¥ããå³è¨ã®ããã«æå®ããschedule_interval="@daily"ã
ã¡ãªã¿ã«ããstart_dateãã«é ãéå»ã®æ¥ä»ãæå®ããã¨ãéå»ã®åã®ã¸ã§ãããschedule_intervalããã¨ã«é¡ã£ã¦å
¨ã¦å®è¡ããã®ã§æ³¨æãå¿
è¦ã§ãã
æå1970å¹´1æ1æ¥ã«ãã¦ããã大éã«ã¸ã§ããå®è¡ããã¾ãããã³ã³ãã¥ã¼ã¿ã®æ´å²ãé¡ã£ã¦ãã¾ãã¾ããã
DAGã«Syntax ã¨ã©ã¼ãããã¨ã
DAG ãã¢ãããã¼ãããã¨ãã« Syntax ã¨ã©ã¼ãããå ´åã«ã¯ã管çç»é¢ã§è©³ç´°è¡¨ç¤ºãã§ã¾ãã
ECSã¿ã¹ã¯ãèµ·åããããã®åææ¡ä»¶
å ¬å¼ããã¥ã¡ã³ããè¦ãã¨ãAmazon MWAA ã¯ä»¥ä¸ã®ã¢ã¼ããã¯ãã£ã«ãªã£ã¦ãã¾ãã
ECSã¿ã¹ã¯ãèµ·åããã¸ã§ããå®è¡ããã¯ã¼ã«ã¼ï¼Airflow Worker(s)ï¼ã«ã¯ä»¥ä¸ãå¿ è¦ã§ãã
ECSã®ãµã¼ãã¹ã¨ã³ããã¤ã³ãã«ã¢ã¯ã»ã¹ã§ãããã¨ã
- ãCustomer VPCãã« ECS ç¨ã® VPC ã¨ã³ããã¤ã³ãï¼com.amazonaws.ap-northeast-1.ecsï¼ããããã¨ã
- ãCustomer VPCãã« NATã²ã¼ãã¦ã§ã¤ãé ç½®ãã¦ã¤ã³ã¿ã¼ãããçµç±ã§ECSã«ã¢ã¯ã»ã¹ããæ¹æ³ãå¯è½ã§ãã
- ãCustomer VPCãã«ã¤ãã¦ã¯ä»¥åã®è¨äºãåç §ã
- ãCustomer VPCãã« ECS ç¨ã® VPC ã¨ã³ããã¤ã³ãï¼com.amazonaws.ap-northeast-1.ecsï¼ããããã¨ã
Amazon MWAA ç°å¢ã® IAMãã¼ã«ã«ECSã¿ã¹ã¯ãèµ·åããIAM権éããããã¨ã
- æ¤è¨¼ã§ã¯ECSã¿ã¹ã¯ã®èµ·åã¨CloudWatch Logsã¸ã®ãã°æ¸è¾¼ã¿ã®ããã«ã以ä¸ããªã·ã¼ã使ç¨
- AmazonECS_FullAccess
- CloudWatchLogsFullAccess
- æ¤è¨¼ã§ã¯ECSã¿ã¹ã¯ã®èµ·åã¨CloudWatch Logsã¸ã®ãã°æ¸è¾¼ã¿ã®ããã«ã以ä¸ããªã·ã¼ã使ç¨
ã¤ã¡ã¼ã¸å³ã¯ä»¥ä¸ã«ãªãã¾ãã
åæãæºããã¦ããªãæã«å®è¡ããã¸ã§ãã®ã¨ã©ã¼ãã°ï¼åèãï¼ï¼
ECSã¿ã¹ã¯ãèµ·åãã
apache-airflow-providers-amazon ã¨ããã©ã¤ãã©ãªãæåãã使ããããã«ãªã£ã¦ãã¾ãã
å·çæç¹ã§ã¯ Release: 2.4.0 ã¨ãªã£ã¦ãã¾ããã
管çç»é¢ä¸ãããæ¢ã«ã©ã¤ãã©ãªãå
¥ã£ã¦ãããã¨ã確èªã§ãã¾ãã
ä¸çªä¸ã®ããã±ã¼ã¸ã§ãã
以ä¸ã«ECSã¿ã¹ã¯ãèµ·åãããµã³ãã«ãè¼ã£ã¦ãã¾ãã ãããåèã«ãã¦ãECSã¿ã¹ã¯ãèµ·åãã¦ã¿ã¾ãã
ãµã³ãã«ã³ã¼ãã§ããï¼è¡ç®ã§ ECSOperator ãã¤ã³ãã¼ããã¦ãã¾ããï¼ï¼ãï¼ï¼è¡ç®ãã ECSã¿ã¹ã¯ãèµ·åããã³ã¼ãã§ãã
å³è¨ã¯ç°å¢ã«åããã¦åãã¦ãã ãããã¯ã©ã¹ã¿ã¼åãã¿ã¹ã¯å®ç¾©ã®ååãã¿ã¹ã¯å®ç¾©å
ã®ã³ã³ããåãã»ãã¥ãªãã£ã°ã«ã¼ãIDããµããããIDã
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta import pendulum from airflow.providers.amazon.aws.operators.ecs import ECSOperator local_tz = pendulum.timezone("Asia/Tokyo") default_args = { "owner": "tetsuya_yamamoto", #ææè "depends_on_past": False, #ååã失æã ã£ãå ´åã«å®è¡ããã "email": ["[email protected]"], #ã¡ã¼ã«ã¢ãã¬ã¹ "email_on_failure": False, #失æããã¨ãã«ã¡ã¼ã«éç¥ããã "email_on_retry": False, #ãªãã©ã¤çºçæã«ã¡ã¼ã«éç¥ããã "retries": 1, #ãªãã©ã¤åæ° "retry_delay": timedelta(minutes=1), #ãªãã©ã¤éé } dag = DAG( 'sample_dag37_46', # DAG å default_args=default_args, # ããã©ã«ãã®å¼æ° description='test_dag', # 説æ start_date=datetime(2022, 9, 16, hour=9, minute=54, second=0, microsecond=0, tzinfo=local_tz), #éå§ end_date=datetime(2022, 9, 16, hour=9, minute=54, second=0, microsecond=0, tzinfo=local_tz), #çµäº schedule_interval=timedelta(minutes=1), # å®è¡éé tags=['example'] ) t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag) t2 = ECSOperator( task_id="run_task", dag=dag, cluster="ã¯ã©ã¹ã¿ã¼å", task_definition="ã¿ã¹ã¯å®ç¾©ã®åå", launch_type="FARGATE", overrides={ "containerOverrides": [ { "name": "ã¿ã¹ã¯å®ç¾©å ã®ã³ã³ããå", "command": ["echo", "hello-world"], }, ], }, network_configuration={ "awsvpcConfiguration": { "securityGroups": ["ã»ãã¥ãªãã£ã°ã«ã¼ãID"], "subnets": ["ãµããããID"], }, }, awslogs_group="/ecs/hello-world", awslogs_stream_prefix="test-airflow", # prefix with container name )
ã¸ã§ããæ£å¸¸çµäºãã¾ããã
ããã¯ã¹ãç·è²ã«ãªã£ã¦ãã¾ãã
print_date ã¯æåã«ä½ã£ãæ¥ä»æå»ãæ¨æºåºåããbashã³ãã³ãï¼dateï¼ã®ã¸ã§ãã§ãã
ECSã¿ã¹ã¯ãèµ·åããã¸ã§ãã¯ä¸ã® run_task ã§ãã
Airflow ã®ãã°ä¸ã§ã¯ãçµäºã³ã¼ã 0 ã§çµãã£ã¦ãã¾ããã
ECSå´ã®ã¿ã¹ã¯ãã°ãåºã¦ãã¾ãã
ã³ãã³ããå¤ãã¦ããã¨å¤±æããã¦ã¿ãã
ã³ãã³ããå¤ãã¦ããã¨å¤±æããã¦ã¿ã¾ãã
ECSã¿ã¹ã¯ãèµ·åããã¸ã§ããç°å¸¸çµäºãã¾ããã
ECSã¿ã¹ã¯ãèµ·åããã¸ã§ãã®ããã¯ã¹ã赤è²ã«ãªã£ã¦ãã¾ãã
Airflow ã®ãã°ä¸ã§ã¯ãçµäºã³ã¼ã 1 ã§çµãã£ã¦ãã¾ããã
ECSå´ã®ã¿ã¹ã¯ãã°ã«è©³ç´°ãåºã¦ãã¾ãã
Airflowã®ã¸ã§ãã失æããæã«ã ããå¾ç¶ã®ã¸ã§ããå®è¡ãã
ã¸ã§ãã失æããæã«ã®ã¿ãå¾ç¶ã®ã¸ã§ããå®è¡ãããã¨ãã§ãã¾ãã
å¾ç¶ã«ã¸ã§ããä½æããä½æããå¾ç¶ã¸ã§ãã®å®ç¾©å
ã«trigger_rule=one_failed
ã¾ã㯠trigger_rule=all_failed
ãæ¸ãã¾ãã
one_failed
ã¯å段ã«è¤æ°ã®ã¸ã§ãããããã¡ãã©ããï¼ã¤ã§ã失æããå ´åã§ãã
all_failed
ã¯å段ã®å
¨ã¦ã®ã¸ã§ãã失æããå ´åã§ãã
åæ§ã«ãone_success
ã all_success
ãããã¾ãã
åèï¼DAGs â Airflow Documentation
ã¸ã§ãã失æããå ´åã«ãå¾ç¶ã®ã¸ã§ããå®è¡ããä¾ï¼
ã¸ã§ããæåããå ´åã«ã¯ãå¾ç¶ã®ã¸ã§ããå®è¡ããªãä¾ï¼
ãµã³ãã«ã³ã¼ãã示ãã¾ãã
ï¼ï¼è¡ç®ï¼æ¥ä»æå»ãæ¨æºåºåããbashã³ãã³ãï¼dateï¼ã®ã¿ã¹ã¯ï¼t1ï¼ã« trigger_rule="one_failed
ã追å ãã¾ããã
ï¼ï¼è¡ç®ï¼æ¥ä»æå»ãæ¨æºåºåããbashã³ãã³ãï¼dateï¼ã®ã¿ã¹ã¯ï¼t1ï¼ããECSã¿ã¹ã¯ãèµ·åããã¸ã§ãï¼t2ï¼ã®å¾ç¶ã«æå®ãã¾ããã
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta import pendulum from airflow.providers.amazon.aws.operators.ecs import ECSOperator local_tz = pendulum.timezone("Asia/Tokyo") default_args = { "owner": "tetsuya_yamamoto", #ææè "depends_on_past": False, #ååã失æã ã£ãå ´åã«å®è¡ããã "email": ["[email protected]"], #ã¡ã¼ã«ã¢ãã¬ã¹ "email_on_failure": False, #失æããã¨ãã«ã¡ã¼ã«éç¥ããã "email_on_retry": False, #ãªãã©ã¤çºçæã«ã¡ã¼ã«éç¥ããã "retries": 1, #ãªãã©ã¤åæ° "retry_delay": timedelta(minutes=1), #ãªãã©ã¤éé } dag = DAG( 'sample_dag37_46', # DAG å default_args=default_args, # ããã©ã«ãã®å¼æ° description='test_dag', # 説æ start_date=datetime(2022, 9, 16, hour=9, minute=54, second=0, microsecond=0, tzinfo=local_tz), #éå§ end_date=datetime(2022, 9, 16, hour=9, minute=54, second=0, microsecond=0, tzinfo=local_tz), #çµäº schedule_interval=timedelta(minutes=1), # å®è¡éé tags=['example'] ) t1 = BashOperator( task_id="print_date", bash_command="date", dag=dag, trigger_rule="one_failed" # t2 ã失æããå ´åã«å®è¡ããã ) t2 = ECSOperator( task_id="run_task", dag=dag, cluster="ã¯ã©ã¹ã¿ã¼å", task_definition="ã¿ã¹ã¯å®ç¾©ã®åå", launch_type="FARGATE", overrides={ "containerOverrides": [ { "name": "ã¿ã¹ã¯å®ç¾©å ã®ã³ã³ããå", "command": ["echo", "hello-world"], }, ], }, network_configuration={ "awsvpcConfiguration": { "securityGroups": ["ã»ãã¥ãªãã£ã°ã«ã¼ãID"], "subnets": ["ãµããããID"], }, }, awslogs_group="/ecs/hello-world", awslogs_stream_prefix="test-airflow", # prefix with container name ) t1.set_upstream(t2) # t1 ãå¾ç¶ã«æå®ã
ã¾ã¨ãã
- ã¸ã§ãã®å®éã®èµ·åæéã«ã¯ã注æãå¿ è¦ã
- ãCustomer VPCãã« ECS ç¨ã® VPC ã¨ã³ããã¤ã³ãã¾ãã¯ãNATã²ã¼ãã¦ã§ã¤ãå¿ è¦ã
- Amazon MWAA ã® IAM ãã¼ã«ã使ãããããã¢ã¯ã»ã¹ãã¼çã®ã»ããã¢ãããä¸è¦ã
- ECSãæä½ããã©ã¤ãã©ãªãæåããå ¥ã£ã¦ãããããç°¡åã«ECSã¿ã¹ã¯ãèµ·åå¯è½ã
- ã¸ã§ãéã®åå¾é¢ä¿ãä»ããããã¸ã§ãã®æåã»å¤±æã«å¿ãã¦å¾ç¶ã¸ã§ããå¤ãããã¨ãå¯è½ã
æ £ãã¦ãã¾ãã°ã使ãåæãè¯ããããªã¸ã§ã管çã½ããã¦ã§ã¢ã ãªãã¨æãã¦ãã¾ãã
å±±æ¬ å²ä¹ (è¨äºä¸è¦§)
ã«ã¹ã¿ãã¼ãµã¯ã»ã¹é¨ã®ã¨ã³ã¸ãã¢ã2024 Japan AWS Top Engineers ã«é¸ãã§ãããã¾ããã
ä»å¹´ã®ç®æ¨ã¯ Advanced Networking â Specialty 㨠Machine Learning - Specialty ãåå¾ãããã¨ã§ãã
å±±ãèµ°ãã®ã趣å³ã§ããä»å¹´ã®ç®æ¨ã¯ 100 km 㨠100 mile ãå®èµ°ãããã¨ã§ãã 100 km 㯠Gran Trail ã¿ãªãã¿ã§å®èµ°ãã¾ãããOSJ koumi 100 㧠100 mile ç ãæ£ãã¾ãããã©ãã㧠100 mile ããããã§ãã
åºæ¬çã«ã®ãã³ãããæ§æ ¼ã§ãã座å³ã®éã¯ããã¤ãçãã