こんにちは。テクノロジカルマーケティング部 データプラットフォームチームの村上です。
弊社各サービスのデータ分析基盤であるLivesense Analytics(以降LA)の開発、運用を行っています。
本日は弊社データプラットフォームでも活用しているフロー管理ツールAirflowについて、分散処理の概要や最近の更新についてご紹介します。
Airflowについて
Airflowとは
github.com Airflowはデータ処理フローを管理するツールです。元々はAirbnb社が開発して公開したソフトウェアです。その後Apacheソフトウェア財団のサポートを受けて現在はApache Airflowという正式名称になっています(本ブログでは以下Airflowと記述します)。ライセンスはApache License2.0です。本体コードはpythonで作られています。
2018年2月現在、正式リリースされているバージョンは1.9です。また、本ドキュメントのコードや説明はAirflow1.9をベースとします。
LA へのAirflow導入の背景
LAの主データベースにはRedshiftを採用しています。 LA 上のデータは弊社各メディアのフロントサイドのWebアクセスデータとサーバーサイドの顧客データ、そしてそれらをサマライズしたデータ等で構成されています。 サーバーサイドデータについては個人情報を匿名化しRedshiftに投入するだけで良いのですが、Webアクセスデータについては広告を制御したり詳細な顧客分析をする用途に用いられるので、生データのままではなく、広告チャンネルやセッションでの統合やサマライズなどを施した加工済みデータが求められます。 このような分析サイドの要求に答えるため、アクセスログデータの加工処理バッチを複数開発してきましたが、これらをcronで運用していたため管理が難しい状況になっていました。
このような背景から、データフロー処理ツールAirflowの導入を行いました。
Airflowは分散処理そのものを自前で実装せずに外部のフレームワークを利用しています。外部のフレームワークを利用することでシステムの柔軟な構成が可能となっている一方、Airflowを構成するサービスの構成、役割分担を理解していないと、効果的な設定を行うことが出来ません。
本稿ではAirflowを構成する各サービスの役割やDAGの書き方を中心に導入方法を解説します。
Airflow+CeleryExecutorで分散処理を行う
以下、弊社でも現在利用しているCeleryExecutorを使った分散処理のお話をしたいと思います。
インストールと設定の概略、常駐プロセスや分散処理用のパラーメータの設定等について触れます
そもそもCeleryとは
Celeryは複数のノードで分散して非同期でタスクキュー/ジョブキュー処理を行うためのフレームワークです。
Celeryはノード間のメッセージのやり取りにRabbitMQやRedis等のミドルウェアを使用します。Celeryではこれをbrokerと呼んでいます。一旦brokerを設置すれば、後は処理ノードを増やすだけで簡単にタスクの処理分散を実現できるのがCeleryの利点です。
CeleryExecutorはAirflowのクラスです。CeleryExecutorを使うと、Celeryを使ってAirflowタスクの分散処理を行えるようになります。
構成
CeleryExecutorを利用したAirflowサーバーをセットアップしてみます。
webserverとscheduler、workerを一つのサーバーにインストールします。 *1
systemdが動いているlinuxにインストールすることを想定した手順です。*2
また、python3, pip が使える前提です。
インストール
$ mkdir ~/airflow
$ export AIRFLOW_HOME=~/airflow
$ pip install apache-airflow
$ pip install "apache-airflow[mysql, celery]"
$ airflow initdb
pip install airflow
と打ってしまうと、パッケージ名変更(Airflow1.8.1)以前のバージョンがインストールされますので注意して下さい。
基本設定
予め以下のデータベース類が用意されている前提です
- airflow metadata database(DAGスケジュールやDAG Runの情報が入っているデータベース )
- MySQLを使用
- celery backend database (Celeryバックエンド用データベース MySQL, PostgreSQL, oracle が使える)
- MySQLを使用
- celery message broker (タスクキューイング用 redis, RabbitMQ等が使える)
- redisを使用
airflow initdb
を行うと、設定ファイルairflow.cfg
のテンプレートが生成されますので、環境に応じて修正を加えてゆきます。
airflow.cfg
[core]
airflow_home = /path/to/airflow
executor = CeleryExecutor
sql_alchemy_conn = mysql://[metadata databaseのエンドポイント]
...
[celery]
celery_result_backend = db+mysql://[celery backend databaseのエンドポイント]
broker_url = redis://celery brokerのエンドポイント
...
設定したら、airflow initdb
を行って、メタデータを初期化します。
$ airflow initdb
コンソールからwebserverを実行して、ブラウザからhttp://localhost:8080 にアクセスし、設定がうまくいっていることを確認します。
$ airflow webserver
Airflowの常駐サービス
Airflowには3種類の主要な常駐サービスがあります。
これらのサービスはデータベースやキューイングマネージャを介して完全に独立して動作するので、別々のサーバーで動作させることもできます。*3
特に分散処理を行う場合は、workerを動作させるworkerサーバーを複数並列に実行することになると思いますが、そのようなことも問題なく実現できます。*4
- airflow webserver
- AirflowのWebUIの処理、出力を行います
- airflow scheduler
- DAGの状態を監視して、スケジュールに応じてDAG Runを作成します
- CeleryExecutorを使用する場合、Operatorのキューイングを行い、各ワーカーにOperatorの処理を分散します
- (Sequential Executor を使用する場合、schedulerの子プロセスでDAG Runの処理が行われます)
- airflow worker
- schedulerがキューイングしたOperatorの処理を実行し、結果を格納します
- (Sequential Executor を使用する場合には airflow worker を常駐させる必要はありません)
systemdで常駐化
以下のsystemdスクリプトを使用します scheduler, webserver, worker を常駐化します github.com セットアップの詳細は割愛します
ここまでの設定をすることでlinuxマシン上でwebserver, scheduler, workerが常駐して動き続けているはずです。
CeleryExecutorを利用する場合の並列処理周りのパラメーター
airflowには並列実行系のパラメーターが複数あり、理解しにくいので整理します
airflow.cfg
[core]
parallelism = x
dag_concurrency = x
max_active_runs_per_dag = x
...
[celery]
celeryd_concurrency = x
...
設定名 | 意味 |
---|---|
parallelism | 分散処理クラスタ全体で実行可能なプロセス数 |
dag_concurrency | 一つのワーカで同時実行可能な最大プロセス数 |
max_active_runs_per_dag | DAG内部で同時実行可能な最大タスク数 |
celeryd_concurrency(Airflow1.9.1以降ではworker_concurrency) | 一つのCeleryワーカで同時実行可能な最大プロセス数 |
これらのパラメーターを適切に設定することで、分散処理のパフォーマンスをチューニングすることができると思います。
workerサーバーを複数台立ててタスクを分散処理する
同じ設定でworkerサービスのみを常駐させるサーバーを別途作成すれば、workerの処理を分散させることができます。
DAG
DAGとは
グラフ理論における有向非巡回グラフ のことです。有向非巡回グラフの場合、
- 有向:辺には矢印のように方向がある
- 非巡回:矢印に沿ってノードをたどっていった時にループが存在しない
といった特徴があります。
ループが存在しないので、DAGではノード間の依存順序を必ず解決できます。この特徴は依存関係があるワークフローを柔軟に途中実行したりする場合に有用なので、多くのワークフロー管理ツールが依存関係グラフとしてDAGを採用しています。AirflowもワークフローをDAGで記述します。
AirflowのDAGについて詳しくはこちらのスライドで説明されています。
DAGファイルを書く
Airflowではデータフローをpythonのコードで記述します。このファイルを単にDAG又はDAGファイルと呼びます。
DAGファイルは airflow.cfg
で設定したdags_folder
(デフォルトでは $AIRFLOW_HOME/dags ) に設置します。
以下、DAGファイルの書き方について説明します。
1. dagオブジェクトを生成
DAGを定義するには、pythonのグローバルスコープにDAGオブジェクトを生成します。
from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta #dagを介してオペレーターに渡す引数を定義 operator_args = { 'owner': 'airflow', 'depends_on_past': False, #指定したタスクの上流タスクの実行が失敗した場合、タスクを実行するかどうかを設定します #True:実行する False:実行しない 'start_date': datetime(2018, 2, 1), #DAG Runの生成期間の開始日時です 'retries': 1, 'retry_delay': timedelta(minutes=5), } #dagの生成 dag = DAG( dag_id='dag_example1', #dagの識別子 default_args=operator_args, #オペレーターに渡す引数を渡します catchup=False, schedule_interval='0 0 * * *' #cronの時間形式で記述します ) ...
DAGの主な引数をまとめます
引数 | 意味 |
---|---|
dag_id | DAGに付ける識別名。 |
schedule_interval | DAG実行間隔を指定します。cron形式の表記ができます(ex. '0 13 * * *')または '@daily' '@hourly' 等の表記もできます。詳細はドキュメントを参照して下さい |
default_args | operator生成時に指定する引数をdictionary形式で渡します。DAGファイルを記述する際には、一つのDAGに複数のオペレーターを紐付ける事が多いと思います。そのような場合、DAGにoperatorの引数を渡しておくとコード量が減って見やすくなります。 |
catchup(Airflow1.8以降) | True/False で記述。 schedulerでDAG Runの遡り生成(Catchup)を行うかどうかを指定します。Falseを指定するとCatchupしなくなります。Catchupについては後述 |
2. オペレーターインスタンスの生成
task1 = BashOperator( task_id='task1', #taskの識別子 bash_command='date', #bashで実行する内容。BashOperator特有 dag=dag #オペレーターをdagと結びつけます ) task2 = BashOperator( task_id='task2', bash_command='sleep 5', dag=dag )
DAGオブジェクトを生成したら、次はDAG内での処理単位となるオペレーターオブジェクトを生成します。 Airflowはオペレーターオブジェクト単位でデータフローを作り上げるという用途を想定して作られているので、各種データベースと接続するためのオペレーターが定義されています。
オペレーターは種類が多いですが、以下、主要なオペレーターの概要をまとめます
incubator-airflow/airflow/contrib/operators at master · apache/incubator-airflow · GitHub
incubator-airflow/concepts.rst at master · apache/incubator-airflow · GitHub
オペレーター | 内容 |
---|---|
SSHExecuteOperator(Airflow1.9で廃止) | SSHで他のサーバーにログインしてコマンド実行するためのオペレーター |
SSHOperator(Airlfow1.9から採用) | 新しいSSH用のオペレーター |
BashOperator | workerローカルでbashコマンドを実行するためのオペレーター |
PythonOperator | workerローカルで任意のPython関数を実行するためのオペレーター |
EmailOperator | emailを送信するためのオペレーター |
HTTPOperator | HTTPリクエストを送信するためのオペレーター |
MySqlOperator, SqliteOperator等 | SQLクエリを実行するためのオペレーター |
Sensor | 指定時間、ファイル、データベース行、S3 key等を取得できるまで待つためのオペレーター |
3. オペレーターインスタンス間の依存関係を記述
オペレーターのインスタンス同士を依存関係で結びます。
依存関係を記述するには、set_downstream()、 set_upstream() 関数を使いますが、Airflow1.8からはシフト演算子も使えるようになりました。
#operator1を実行してからoperator2を実行する #古い表記例 task1.set_downstream(task2) #または task2.set_upstream(task1) #新しい表記例(Airflow1.8以降) task1 >> task2 #または task2 << task1
4. DAGファイルの読み込み
ここまでの設定を dags_folder
上にあるファイルに保存したらDAGの設定は完了です。
記述したグラフに矛盾や間違いがなければAirflowが自動で読み込んでくれるはずです。
読み込まれない場合はschedulerを再起動してみましょう。*5
Catchupについて
DAG Run
DAG Run はDAGの一回分の実行を表現するメタデータ上の概念です。 DAGの実行をする際には、スケジュール実行する場合も、一回限りの即時実行をする場合も、DAG Runがメタデータ上に作成されます。 DAG Runはsuccess, failed, running等の実行状態を保持しています。
Catchup動作
scheduler は DAGの schedule_interval
で設定されている時間になるとDAG Runを生成してDAGの処理を始めます。
この動作は一見何でもないように見えますが、実は、 scheduler は前回のDAG Runが存在するかどうかチェックしています。
前回分のDAG Runがあれば最新のDAG Runを一つ生成するだけですが、
前回分のDAG Runがない場合、履歴をたどっていって最も近い過去に実行されたDAG Runから現在時刻までの間のDAG Runをまとめて生成します。
過去に一度もDAG Runが生成されていない場合、DAGに定義されている start_date
から現在時刻までのDAG Runsを一括生成します。
この動作を Airflow では Catchup と呼んでいます。
例えば、あるDAGを1日一回実行していましたが何らかの事情で1月3日分まででで止めてしまったとします。その後、2月1日にDAGの実行を再開すると、1月4日から2月1日の分のDAG Runが一気に生成されて順次実行されます。
DAGからexecution_dateを受け取って指定日1日分のデータを処理するようなフローの場合はこれで良いのですが、最新のデータだけを処理すれば良いフローの場合、DAG Runがたくさん生成されても困ってしまいます。
このような場合にはCatchupを無効にすることができます。方法は2つあります。
airflow.cfg を書きかえてCatchupを無効にする
全てのDAGでCatchupが無効になります
設定するには、
airflow.cfg
を書き換えます[scheduler]
catchup_by_default = True
DAGの引数に指定する
指定したDAG内だけでCatchupが無効になります
catchup = True / catchup = False
実際にLAにAirflowを導入してみて
Airflowを導入して良かったところ
Airflow側で実行ログを保存してくれる トラブルの際にタスク実行サーバーにログインしてログを確認する必要がなくなりました
タスク毎の実行履歴が見られる 実行履歴からログが見られるので状況の把握や問題の原因究明がやりやすくなりました
依存関係があるデータフロー処理タスクをDAGでコード記述できる 依存関係の記述がやりやすくなりました。 また、データ処理フローをコードで記述できるので、バージョン管理しやすくなりました
データフローをグラフィカルに確認できる 複雑なデータフローを記述する際に、依存関係に間違いがないか一目で確認できるようになりました
微妙だった所
DAGを消してもWebUIからdag_id名が消えない
リネームしたDAG等が残ってしまいます。そういう仕様だと思って割り切って使っていますが、リネームする項目が増えた時にどうしようかと思ってしまいます
DAG導入のタイミングによっては、ExecutionDateとStartDateがズレてしまう
残念ながら、この挙動については原因が追いきれてません
まとめ
今回はAirflow分散処理の概要についてご説明させて頂きました。以下にAirflowの最近の更新履歴概要の抄訳を付録としまして置きましたので最近の状況についてご確認いただければと思います。
付録:Airflow 最近の更新状況
Apache Airflowはこの1年で2回の大きなアップデート(version 1.9, version 1.8)を行っています。 以前とは設定項目などが色々変わっていますので、以前利用されていた方も最新の状況を確認されるのが良いと思います。
incubator-airflow/UPDATING.md at master · apache/incubator-airflow · GitHub
以下、抄訳です。
次期バージョン(予定)
Celeryコンフィギュレーションパラメータ名が変更になります
celeryd_concurrency
からworker_concurrency
に変更celery_result_backend
からresult_backend
に変更
GCP Dataflow Operators
- Dataflow job labeling が Dataflow {Java,Python} Operator でサポートされます
Airflow 1.9
リリース 2017/12
SSH HookがサブプロセスベースのSSHからParamikoライブラリを使用したものに切替え
SSHExecuteOperator
は廃止になりました。SSHOperator
を使用して下さい- 併せて、
SFTPOperator
が追加されました
airflow.hooks.S3_hook.S3Hookがboto2からBoto3を使うようになりました
s3_conn_id
は受け付けなくなりました。aws_conn_id
を使って下さい- デフォルトコネクションが
s3_default
からaws_default
に変更
ログシステムの作り変え
pythonのloggingモジュールを使うように変更されました
既存のLoggingMixinクラスを拡張することでロギングをカスタマイズできるようになりました
Dask Executor の導入
- Dask分散クラスタでタスクを実行できるようになりました
Airflow 1.8.1
リリース 2017/5
パッケージ名がairflowからapache-airflowに変更されました
Airflow 1.8
リリース 2017/3
metadataデータベースの構造が変更されたので、バージョンアップするにはmetadataのアップグレードが必要になりました
- metadataデータベースをアップグレードするには、
airflow upgradedb
を実行する必要があります
poolの管理がより厳密になりました
- バージョン1.7.1では、許可されている以上の数のプールを取得できる問題がありましたが、今回のバージョンからできなくなりました
スケジューラーのオプションが新しくなりました
child_process_log_directory
- 堅牢性を向上するために、DAGSの処理はスケジューラーとは別のプロセスを立てて実行するようになりましたのでそれぞれがログを出力します
- それぞれのDAGS処理プロセスが、Airflow.cfg上で設定した
child_process_log_directory
以下にログファイルを出力します
コマンドラインオプション num_runs の意味が変わりました
- スケジューラーの最大リトライ回数 から run_duration時間内の最大リトライ回数に変更されました
min_file_process_interval
- 省略
dag_dir_list_interval
- 省略
catchup_by_default
- 本ドキュメントで触れました
DAG処理中のエラーがWebUIに表示されなくなりました
- DAG処理エラーは
airflow.cfg
で設定されるchild_process_log_directory
以下に出力されます
新しく登録したDAGはデフォルトでポーズ状態になります
Airflow.cfg
内にdags_are_paused_at_creation = False
と記述することで以前のふるまいに戻すことができます。
コンテキスト変数が Hive config に渡せるようになりました
Google Cloud Operator と Hook が整理されました
deprecated扱いになり、将来的に削除される機能があります
- HookとOperatorはairflowオブジェクトから直接参照できなくなります。サブモジュールからimportして下さい。
- Operator._init_()は任意の引数を許容しなくなります
- secure_modeはデフォルトでTrueになります
*1:(実際に分散処理を行う場合にはwebserverとschedulerで一台、workerだけのマシン複数台で運用すると思いますが)
*2:具体的なディストリビューション等についてはここでは言及しません
*3:当然ですが metadata database としてローカルマシンのデータベースを使用している場合 webserver と worker を別々のサーバーで動かすことは出来ません
*4:Airflowは分散マシン間のDAGファイルの同期について一切面倒を見てくれません。workerを複数動かす場合、バージョン管理ツールや構成管理ツール等を用いて、各workerサーバーのDAGファイルの中身をユーザーが同期しておく必要があります
*5:スケジューラーはairflow.cfgのmin_file_process_intervalに設定されている間隔で更新をしていますが、この設定値には下限があって、デフォルト値の0設定でも3分間隔になるようです。また、新しいファイルの監視の設定はdag_dir_list_intervalでこちらはデフォルト5分間隔で管理しているようです。