JXé信社ã·ãã¢ã»ã¨ã³ã¸ãã¢ã§, ãããã¯ããã¼ã ã®ãã¼ã¿æ´»ç¨ã¨ãã¼ã¿ãµã¤ã¨ã³ã¹ã®ããããé å¼µã£ã¦ããã³, @shinyorkeï¼ãããã¼ãï¼ã§ã.
æè¿ããã£ã¦ããã¤æ¯æã®æ¥èª²ã¯ããªã³ã°ãã£ããã¢ããã³ãã£ã¼*1ã§æ±ãæµãã¦ããã®æé£ãã§ã. 35æ¥é£ç¶ç¶ãã¦ãã¾ã.
話ã¯é¡ããã¨ä»å¹´ã®7ææ«ã«ãªãã¾ãã, JXé信社ã®ãã¼ã¿åºç¤ã®ç´¹ä»&ãETLã¨ããããã£ã¦ã©ã®FW/ã©ã¤ãã©ãªä½¿ãã°ããã®ãð¤ãã¨ããã¯ã¨ã¹ãã§ã³ã«å¿ãããã, ãã®ãããªã¨ã³ããªã¼ãå ¬éãã¾ãã.
ãã®ã¨ã³ããªã¼, å¤ãã®æ¹ããåé¿ãããã ãå·çãã¦ããã£ãã§ã, èªãã§ãã ãã£ãæ¹ãããã¨ããããã¾ãï¼
ã¾ã ãèªã¿ã§ãªãæ¹ã¯ãã®ã¨ã³ããªã¼ãèªã¿é²ããåã«æµãã¦èªãã§ããããã¨è¯ãããç¥ãã¾ãã.
ä¸è¨ã®ã¨ã³ããªã¼ã®æå¾ã§,
次ã¯prefectç·¨ã§ä¼ãã¾ããã.
ã¨ããæ¨æ¶ã§ç· ãããã¦ããã£ãã®ã§ãã, ãã®ã¨ã³ããªã¼ã¯ã¾ãã«prefectç·¨ã¨ãããã¨ã§ãéããããã¨æãã¾ã.
ä»åã¯prefectã§ç°¡åãªãããã·ã¹ãã ãä½ã£ã¦åãã, ã¨ãããã¼ãã§å®è£ ãåæãä¸å¿ã«ç´¹ä»ãã¾ã.
prefectãã¯ãããã
prefect #ã¨ã¯
ç°¡åã«è¨ã£ã¡ããã¨, Pythonã§éçºããããããã¢ããªã®Frameworkã§,
The easiest way to automate your data.
ï¼æ訳ï¼ããªãã®ãã¼ã¿ãèªååãã¦ããæãã«ããã®ã«æãç°¡åãªæ¹æ³ãã§ï¼
ãã¦ãªã¨ãªã£ã¦ãã模æ§ã§ã.
å ¬å¼ãªãã¸ããªã®README.mdã®è§£èª¬ã«ããã¨,
Prefect is a new workflow management system, designed for modern infrastructure and powered by the open-source Prefect Core workflow engine. Users organize Tasks into Flows, and Prefect takes care of the rest.
ï¼æ訳ï¼prefectã¯ä»é¢¨ã®ã¤ã³ãã©ã¹ãã©ã¯ãã£ã¼ã«åããã¦è¨è¨ãããFrameworkã§, éçºè ã¯Taskã¨Flowãæ¸ãã¦ãããããã¨ã¯Prefect Coreãããæãã«ã¯ã¼ã¯ããã¼ã¨ãã¦å¦çãããã§ï¼
ã¨ããã¢ãã«ãªãã¾ã.
ã¡ãªã¿ã«Hello worldã¯ãããªæãã§ã.
from prefect import task, Flow, Parameter @task(log_stdout=True) def say_hello(name): print("Hello, {}!".format(name)) with Flow("My First Flow") as flow: name = Parameter('name') say_hello(name) flow.run(name='world') # "Hello, world!" flow.run(name='Marvin') # "Hello, Marvin!"
@task
ãã³ã¬ã¼ã¿ã¼ãã¤ããé¢æ°ï¼ä¸è¨ã®å ´åsay_hello
ãããï¼ãå®éã«å¦çãè¡ãé¢æ°.
å¦çã«å¿
è¦ãªå¼æ°ãåã£ããé¢æ°ãå¼ãã ãããwith Flow("My First Flow") as flow
ã®é¨åãéçºè
ãå®è£
, ãã¨ã¯ãããªã«ãã£ã¦ããã¾ã.
ããã«ã¡ã¯prefect
ã¨ãã訳ã§æ©éprefectãã¯ããã¦ã¿ã¾ããã.
ä¸çªã©ã¯ãªè¦ãæ¹ã»å§ãæ¹ã¯å ¬å¼ã®ãªãã¸ããªãcloneãã¦ãã¥ã¼ããªã¢ã«ãæå ã§åãããã¨ããªã¨æã£ã¦ãã¾ã.
â»ç§ã¯ãããªããªã§ããã¾ãã.
$ [email protected]:PrefectHQ/prefect.git $ cd prefect $ pip install prefect SQLAlchemy
SQLAlchemy
ãå
¥ã£ã¦ããã®ã¯ã²ã£ããã¨ãã¥ã¼ããªã¢ã«ã§ä¾åãã¦ããããã§ãï¼å°å£°ï¼*2.
ã¡ãªã¿ã«Python3.9ã§ãåãã¾ããð
ããã¾ã§è¡ãã°å¾ã¯ãã¥ã¼ããªã¢ã«ã®ã³ã¼ããåããã¦ã¿ã¾ããã.
$ cd examples/tutorial $ python 01_etl.py
ãã®ããã°ãå·çãã2020/12/18ç¾å¨ã§ã¯, 06_parallel_execution.py
以å¤, æ»ããªãåãã¾ãã.
ã²ã¨ã¾ããããªæãã§åãããªãã, é©å½ã«æ¸ãæããªããåããã¦ããã¨ããæãã«ãªãã¨æãã¾ã.
軽ãã®ãããå¦çãä½ã£ã¦ã¿ã
exampleããããã£ãæç¹ã§å°ããã®ã¢ããªã¯ä½ããããããªãããªã¨æãã¾ã.
...ã¨è¨ã£ã¦ã, ä½ã«ããµã³ãã«ãç¡ãã®ãã¢ã¬ã¨æãç¨æãã¾ãã.
baseballdatabankã¨ããã¡ã¸ã£ã¼ãªã¼ã°â¾ï¸ã®ãªã¼ãã³ãã¼ã¿ã»ããã使ã£ã¦è¶ ç°¡åãªETLãããã®ãµã³ãã«ã§ã.*3
- é¸æã®ãããã£ã¼ã«ï¼People.csvï¼ãèªã¿è¾¼ã¿
- æææ績ï¼Batting.csvï¼ãèªã¿è¾¼ã¿&æçç足ããªãææ¨ãè¨ç®
- é¸æãããã£ã¼ã«ã¨æææ績ãJOINãã¦ã®ã¡csvã¨åºå
ã¨ããETL Workflowãªã®ã§ãã, ãã¡ãã®å¦çã¯ãã£ãããã ãã®ã³ã¼ãã§ããæãã«ã§ãã¾ã.
import logging from datetime import datetime import pandas as pd import click from pythonjsonlogger import jsonlogger from prefect import task, Flow, Parameter logger = logging.getLogger() handler = logging.StreamHandler() formatter = jsonlogger.JsonFormatter() handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.DEBUG) @task def read_csv(path: str, filename: str) -> pd.DataFrame: """ Read CSV file :param path: dir path :param filename: csv filename :return: dataset :rtype: pd.DataFrame """ # ETLã§è¨ãã¨Extractã§ã logger.debug(f'read_csv: {path}/{filename}') df = pd.read_csv(f"{path}/{filename}") return df @task def calc_batting_stats(df: pd.DataFrame) -> pd.DataFrame: """ æçã»åºå¡çã»é·æçãè¨ç®ãã¦è¿½å :param df: Batting Stats :return: dataset :rtype: pd.DataFrame """ # ETLã§è¨ãã¨Transformã§ã logger.debug('calc_batting_stats') _df = df _df['BA'] = round(df['H'] / df['AB'], 3) _df['OBP'] = round((df['H'] + df['BB'] + df['HBP']) / (df['AB'] + df['BB'] + df['HBP'] + df['SF']), 3) _df['TB'] = (df['H'] - df['2B'] - df['3B'] - df['HR']) + (df['2B'] * 2) + (df['3B'] * 3) + (df['HR'] * 4) _df['SLG'] = round(_df['TB'] / _df['AB'], 3) _df['OPS'] = round(_df['OBP'] + _df['SLG'], 3) return df @task def join_stats(df_player: pd.DataFrame, df_bats: pd.DataFrame) -> pd.DataFrame: """ join dataframe :param df_player: player datea :param df_bats: batting stats :return: merged data :rtype: pd.DataFrame """ # ETLã§è¨ãã¨Transformã§ã logger.debug('join_stats') _df = pd.merge(df_bats, df_player, on='playerID') return _df @task def to_csv(df: pd.DataFrame, run_datetime: datetime, index=False): """ export csv :param df: dataframe :param run_datetime: datetime :param index: include dataframe index(default: False) """ # ETLã§è¨ãã¨Loadã§ã logger.debug('to_csv') df.to_csv(f"{run_datetime.strftime('%Y%m%d')}_stats.csv", index=index) @click.command() @click.option("--directory", type=str, required=True, help="Baseball Dataset Path") @click.option("--run-date", type=click.DateTime(), required=True, help="run datetime(iso format)") def etl(directory, run_date): with Flow("etl") as flow: run_datetime = Parameter('run_datetime') path = Parameter('path') # Extract Player Data df_player = read_csv(path=path, filename='People.csv') # Extract Batting Stats df_batting = read_csv(path=path, filename='Batting.csv') # Transform Calc Batting Stats df_batting = calc_batting_stats(df=df_batting) # Transform JOIN df = join_stats(df_player=df_player, df_bats=df_batting) # Load to Data to_csv(df=df, run_datetime=run_datetime) flow.run(run_datetime=run_date, path=directory) if __name__ == "__main__": etl()
pandasã®æ©æµã«æãã£ã¦*4prefectã®ãä½æ³ã«å¾ãã¨æ¯è¼çè¦éãã®è¯ãworkflowãæ¸ãã¾ãã, ã¨ããã®ããããã¾ã.
ä»åã¯csvãã¡ã¤ã«ãæçµçãªinput/outputã«ãã¦ãã¾ãã,
- ã¹ãã¬ã¼ã¸ã«ããjsonãããæãã«å¦çãã¦BigQueryã«import
- Athenaã¨BigQueryã®ãã¼ã¿ãããããèªã¿è¾¼ãã§å¤æãã¦ãµã¼ãã¹ã®RDBMSã«ä¿å
ã¿ãããªäºããã¡ããã§ãã¾ãï¼taskã«å½ããé¨åã§ããæãã«ããã°ï¼.
ãã®è¾ºã¯ãã¼ã¿åºç¤ãETLä½ãã«æ £ãã¦ããªã人ã§ãPythonã®èªã¿æ¸ããã§ããã°ç´æçã«çµããã®ã§ããªãããããããªããã¨æã£ã¦ãã¾ã.
ãã®ä»ã«ã§ãããã¨&æ¬ ç¹ã¨ã
ä»åã¯ãã²ã¨ã¾ãprefectã§ETLã£ã½ãããããä½ã£ã¦åãããã¨ããåæ©ã«ãã©ã¼ã«ã¹ãã¦ãã¾ãã, å®ã¯ãã®prefecté«æ©è½ã§ãã¦,
- ã¿ã¹ã¯ã®é²è¡ç¶æ³ãGUIã§è¡¨ç¤ºå¯è½ï¼Airflowã¨ãLuigiã£ã½ãç»é¢ï¼
- æ¨æºã§Docker, k8sã®ä»GCP, AWS, Azureçã®ã¡ã¸ã£ã¼ãªã¯ã©ã¦ããµã¼ãã¹ã§ããæãã«åããã
ãªã©, ããªããªãããªäºãã§ãã¾ã.
ä¸æ¹, 使ã£ãã¨ãã®ãã¬ãã£ããªææ³ã¨ãã¦ã¯,
- è²ã ã§ãããã ãã©, è²ã ããããã«è¦ãããã¨ã¯ã¾ãã¾ããããããã.
- è²ã ã§ãããã ãã©, ãããæ ã«ä¾åãã¦ããã©ã¤ãã©ãªã¨ããå¤ã, èªåã§ãã¹ãã£ã³ã°ããã¨ãã®ã¡ã³ãå¹çã¨ãã¯ã¡ãã£ã¨èãã¦ãã¾ã.
- ã¡ããã¨ãããã°ãã¦ãªãã®ã§ã¢ã¬ã§ãã, 並åå¦çã®æ©æ§ããã³ãã«ä¸¦åã§åãã¦ããèªä¿¡ããªãã¨ããããð¤
ã¨, å¿é ãªãã¤ã³ããããã¤ãããã¾ãã.
åã®ã¨ã³ããªã¼ã«ãè¨è¼ãã¾ããã,
ETLãã¬ã¼ã ã¯ã¼ã¯, çµå±ã©ããçãããã¾ãã®ã§é·ããã¤ããããåæã«ãã£ã¦ãããï¼
çµå±ã®ã¨ããããã«å°½ããããªãã¨æãã¾ã*5.
çµã³ - ä»å¾ã®ãã®çéã£ã¦ð¤
ã¨ããããã§prefectã使ã£ãããæããªãããéçºã®è©±ã§ãã.
ãã¼ã¿åºç¤ãæ©æ¢°å¦ç¿ã®Workflowã§ä½¿ããããã®FWãã©ã¤ãã©ãªã¯ãã³ã群éå²æ ã ãªãã¨æã£ã¦ãã¾ãã¦,
Airflowã®DAGãã·ã³ãã«ã«æ¸ããããã«ãªã£ããï¼ã»ã¼prefectã¨åãæ¸ãæ¹ã§ããã*6ï¼, BigQueryã®ãã¼ã¿ãããæãã«ããç¨åº¦ã®ETLãªãã»ã¼SQLã§çµããæªæ¥ãããï¼ããï¼ã ã£ããã¨, ãã®çéãã³ãåããæ´»çºã§ã.
ãã®ã¨ã³ããªã¼ã®å 容ããã£ã¨åå¹´å¾ã«ã¯å¤ããã®ã«ãªã£ã¦ãããã§ãã, ãã¬ã³ãã«ä¹ãé ããªãããã«ä»å¾ããã£ã¬ã³ã¸ã¨èªå¦èªç¿ãç¶ãããã¨æãã¾ãï¼
ãªãJXé信社ã§ã¯ãããªããªã§å ±ã«èªå¦èªç¿ããªãããµã¼ãã¼ãµã¤ãã®PythonãGoã§ããæãã«ãã£ã¦ããå¦çããã®ã¤ã³ã¿ã¼ã³ãåéãã¦ãã¾ã.
ããããç§ãå¹´å ããã¯ããã°ãæ¸ãã®ã¯æå¾ããª...
çæ§è¯ããå¹´ã&æ¥å¹´ã¾ãæ°ããªãã¿ã§ãä¼ããã¾ãããï¼
*1:å·çæç¹ã®LVã¯58, éåè² è·ã¯MAXã®30ã§ã. çèãåãã§ã¾ãðª
*2:ãã®ã¨ã³ããªã¼ã®ããä¹ ã ã«è©¦ãã¦ãã¾ããããã£ï¼å¯ãï¼ã¨ãªãã¾ã.
*3:ãªãâ¾ã®ãªã¼ãã³ãã¼ã¿åã¨ããã¨, ç§ã®è¶£å³ãã¤æã«å ¥ãããã使ãããããªã¼ãã³ãã¼ã¿ã ã£ãããã§ã.
*4:ãã®ç¨åº¦ã®å¦çã ã¨prefectããpandasã®åªç§ããç®ç«ã¤æ°ã¯ãã¾ãã, ãã³ã¬ã¼ã¿ã¼ã§ããæãã«flowã¨taskã«åãããã¦ãããããprefectã®è¨è¨ææ³ã¯ä¸ã çãè¯ãã¨è¨ãããã§ã.
*5:ETLã«éã£ã話ã§ã¯ãªãã®ã§ãã, é¸ãã 以ä¸ã¡ã³ããã¡ããã¨ãã, 使ãåãè¦æã§ããã£ã¦ãã¨ããªãã¨æã£ã¦ãã¾ã.
*6:ä½è«ã§ããprefectã®ä½è ã¯Airflowã®ã³ã³ããªãã¥ã¼ã¿ã¼?ä½è ??ãããã§ã.