LIVESENSE Data Analytics Blog

リブセンスのデータ分析、機械学習、分析基盤に関する取り組みをご紹介するブログです。

Airflow を用いたデータフロー分散処理

こんにちは。テクノロジカルマーケティング部 データプラットフォームチームの村上です。

弊社各サービスのデータ分析基盤であるLivesense Analytics(以降LA)の開発、運用を行っています。

本日は弊社データプラットフォームでも活用しているフロー管理ツールAirflowについて、分散処理の概要や最近の更新についてご紹介します。

Airflowについて

Airflowとは

github.com Airflowはデータ処理フローを管理するツールです。元々はAirbnb社が開発して公開したソフトウェアです。その後Apacheソフトウェア財団のサポートを受けて現在はApache Airflowという正式名称になっています(本ブログでは以下Airflowと記述します)。ライセンスはApache License2.0です。本体コードはpythonで作られています。

2018年2月現在、正式リリースされているバージョンは1.9です。また、本ドキュメントのコードや説明はAirflow1.9をベースとします。

LA へのAirflow導入の背景

LAの主データベースにはRedshiftを採用しています。 LA 上のデータは弊社各メディアのフロントサイドのWebアクセスデータとサーバーサイドの顧客データ、そしてそれらをサマライズしたデータ等で構成されています。 サーバーサイドデータについては個人情報を匿名化しRedshiftに投入するだけで良いのですが、Webアクセスデータについては広告を制御したり詳細な顧客分析をする用途に用いられるので、生データのままではなく、広告チャンネルやセッションでの統合やサマライズなどを施した加工済みデータが求められます。 このような分析サイドの要求に答えるため、アクセスログデータの加工処理バッチを複数開発してきましたが、これらをcronで運用していたため管理が難しい状況になっていました。

このような背景から、データフロー処理ツールAirflowの導入を行いました。

Airflowは分散処理そのものを自前で実装せずに外部のフレームワークを利用しています。外部のフレームワークを利用することでシステムの柔軟な構成が可能となっている一方、Airflowを構成するサービスの構成、役割分担を理解していないと、効果的な設定を行うことが出来ません。

本稿ではAirflowを構成する各サービスの役割やDAGの書き方を中心に導入方法を解説します。

Airflow+CeleryExecutorで分散処理を行う

以下、弊社でも現在利用しているCeleryExecutorを使った分散処理のお話をしたいと思います。

インストールと設定の概略、常駐プロセスや分散処理用のパラーメータの設定等について触れます

そもそもCeleryとは

Celeryは複数のノードで分散して非同期でタスクキュー/ジョブキュー処理を行うためのフレームワークです。

Celeryはノード間のメッセージのやり取りにRabbitMQやRedis等のミドルウェアを使用します。Celeryではこれをbrokerと呼んでいます。一旦brokerを設置すれば、後は処理ノードを増やすだけで簡単にタスクの処理分散を実現できるのがCeleryの利点です。

CeleryExecutorはAirflowのクラスです。CeleryExecutorを使うと、Celeryを使ってAirflowタスクの分散処理を行えるようになります。

構成

f:id:livesense-analytics:20180201123428p:plain

CeleryExecutorを利用したAirflowサーバーをセットアップしてみます。

webserverとscheduler、workerを一つのサーバーにインストールします。 *1

systemdが動いているlinuxにインストールすることを想定した手順です。*2

また、python3, pip が使える前提です。

インストール

$ mkdir ~/airflow

$ export AIRFLOW_HOME=~/airflow

$ pip install apache-airflow

$ pip install "apache-airflow[mysql, celery]"

$ airflow initdb

pip install airflow と打ってしまうと、パッケージ名変更(Airflow1.8.1)以前のバージョンがインストールされますので注意して下さい。

基本設定

予め以下のデータベース類が用意されている前提です

  • airflow metadata database(DAGスケジュールやDAG Runの情報が入っているデータベース )
    • MySQLを使用
  • celery backend database (Celeryバックエンド用データベース MySQL, PostgreSQL, oracle が使える)
    • MySQLを使用
  • celery message broker (タスクキューイング用 redis, RabbitMQ等が使える)
    • redisを使用

airflow initdb を行うと、設定ファイルairflow.cfg のテンプレートが生成されますので、環境に応じて修正を加えてゆきます。

airflow.cfg

[core]

airflow_home = /path/to/airflow

executor = CeleryExecutor

sql_alchemy_conn = mysql://[metadata databaseのエンドポイント]

...

[celery]

celery_result_backend = db+mysql://[celery backend databaseのエンドポイント]

broker_url = redis://celery brokerのエンドポイント

...

設定したら、airflow initdbを行って、メタデータを初期化します。

$ airflow initdb

コンソールからwebserverを実行して、ブラウザからhttp://localhost:8080 にアクセスし、設定がうまくいっていることを確認します。

$ airflow webserver

f:id:livesense-analytics:20180131141028p:plain

Airflowの常駐サービス

Airflowには3種類の主要な常駐サービスがあります。

これらのサービスはデータベースやキューイングマネージャを介して完全に独立して動作するので、別々のサーバーで動作させることもできます。*3

特に分散処理を行う場合は、workerを動作させるworkerサーバーを複数並列に実行することになると思いますが、そのようなことも問題なく実現できます。*4

  • airflow webserver
    • AirflowのWebUIの処理、出力を行います
  • airflow scheduler
    • DAGの状態を監視して、スケジュールに応じてDAG Runを作成します
    • CeleryExecutorを使用する場合、Operatorのキューイングを行い、各ワーカーにOperatorの処理を分散します
    • (Sequential Executor を使用する場合、schedulerの子プロセスでDAG Runの処理が行われます)
  • airflow worker
    • schedulerがキューイングしたOperatorの処理を実行し、結果を格納します
    • (Sequential Executor を使用する場合には airflow worker を常駐させる必要はありません)

systemdで常駐化

以下のsystemdスクリプトを使用します scheduler, webserver, worker を常駐化します github.com セットアップの詳細は割愛します

ここまでの設定をすることでlinuxマシン上でwebserver, scheduler, workerが常駐して動き続けているはずです。

CeleryExecutorを利用する場合の並列処理周りのパラメーター

airflowには並列実行系のパラメーターが複数あり、理解しにくいので整理します

airflow.cfg

[core]

parallelism = x

dag_concurrency = x

max_active_runs_per_dag = x

...

[celery]

celeryd_concurrency = x

...

設定名 意味
parallelism 分散処理クラスタ全体で実行可能なプロセス数
dag_concurrency 一つのワーカで同時実行可能な最大プロセス数
max_active_runs_per_dag DAG内部で同時実行可能な最大タスク数
celeryd_concurrency(Airflow1.9.1以降ではworker_concurrency) 一つのCeleryワーカで同時実行可能な最大プロセス数

これらのパラメーターを適切に設定することで、分散処理のパフォーマンスをチューニングすることができると思います。

workerサーバーを複数台立ててタスクを分散処理する

f:id:livesense-analytics:20180201130947p:plain

同じ設定でworkerサービスのみを常駐させるサーバーを別途作成すれば、workerの処理を分散させることができます。

DAG

DAGとは

グラフ理論における有向非巡回グラフ のことです。有向非巡回グラフの場合、

  • 有向:辺には矢印のように方向がある
  • 非巡回:矢印に沿ってノードをたどっていった時にループが存在しない

といった特徴があります。

ループが存在しないので、DAGではノード間の依存順序を必ず解決できます。この特徴は依存関係があるワークフローを柔軟に途中実行したりする場合に有用なので、多くのワークフロー管理ツールが依存関係グラフとしてDAGを採用しています。AirflowもワークフローをDAGで記述します。

AirflowのDAGについて詳しくはこちらのスライドで説明されています。

DAGファイルを書く

Airflowチュートリアル

Airflowではデータフローをpythonのコードで記述します。このファイルを単にDAG又はDAGファイルと呼びます。

DAGファイルは airflow.cfgで設定したdags_folder (デフォルトでは $AIRFLOW_HOME/dags ) に設置します。

以下、DAGファイルの書き方について説明します。

1. dagオブジェクトを生成

DAGを定義するには、pythonのグローバルスコープにDAGオブジェクトを生成します。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

#dagを介してオペレーターに渡す引数を定義
operator_args = {
    'owner': 'airflow', 
    'depends_on_past': False,  #指定したタスクの上流タスクの実行が失敗した場合、タスクを実行するかどうかを設定します
                               #True:実行する False:実行しない
    'start_date': datetime(2018, 2, 1),  #DAG Runの生成期間の開始日時です
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

#dagの生成
dag = DAG(
    dag_id='dag_example1', #dagの識別子
    default_args=operator_args, #オペレーターに渡す引数を渡します
    catchup=False, 
    schedule_interval='0 0 * * *' #cronの時間形式で記述します
)
...

DAGの主な引数をまとめます

引数 意味
dag_id DAGに付ける識別名。
schedule_interval DAG実行間隔を指定します。cron形式の表記ができます(ex. '0 13 * * *')または '@daily' '@hourly' 等の表記もできます。詳細はドキュメントを参照して下さい
default_args operator生成時に指定する引数をdictionary形式で渡します。DAGファイルを記述する際には、一つのDAGに複数のオペレーターを紐付ける事が多いと思います。そのような場合、DAGにoperatorの引数を渡しておくとコード量が減って見やすくなります。
catchup(Airflow1.8以降) True/False で記述。 schedulerでDAG Runの遡り生成(Catchup)を行うかどうかを指定します。Falseを指定するとCatchupしなくなります。Catchupについては後述

2. オペレーターインスタンスの生成

task1 = BashOperator(
    task_id='task1', #taskの識別子
    bash_command='date', #bashで実行する内容。BashOperator特有
    dag=dag #オペレーターをdagと結びつけます
)

task2 = BashOperator(
    task_id='task2', 
    bash_command='sleep 5',  
    dag=dag 
)

DAGオブジェクトを生成したら、次はDAG内での処理単位となるオペレーターオブジェクトを生成します。 Airflowはオペレーターオブジェクト単位でデータフローを作り上げるという用途を想定して作られているので、各種データベースと接続するためのオペレーターが定義されています。

オペレーターは種類が多いですが、以下、主要なオペレーターの概要をまとめます

incubator-airflow/airflow/contrib/operators at master · apache/incubator-airflow · GitHub

incubator-airflow/concepts.rst at master · apache/incubator-airflow · GitHub

オペレーター 内容
SSHExecuteOperator(Airflow1.9で廃止) SSHで他のサーバーにログインしてコマンド実行するためのオペレーター
SSHOperator(Airlfow1.9から採用) 新しいSSH用のオペレーター
BashOperator workerローカルでbashコマンドを実行するためのオペレーター
PythonOperator workerローカルで任意のPython関数を実行するためのオペレーター
EmailOperator emailを送信するためのオペレーター
HTTPOperator HTTPリクエストを送信するためのオペレーター
MySqlOperator, SqliteOperator等 SQLクエリを実行するためのオペレーター
Sensor 指定時間、ファイル、データベース行、S3 key等を取得できるまで待つためのオペレーター

3. オペレーターインスタンス間の依存関係を記述

オペレーターのインスタンス同士を依存関係で結びます。

依存関係を記述するには、set_downstream()、 set_upstream() 関数を使いますが、Airflow1.8からはシフト演算子も使えるようになりました。

Bitshift Composition

f:id:livesense-analytics:20180201150723p:plain

#operator1を実行してからoperator2を実行する
#古い表記例
task1.set_downstream(task2)
#または
task2.set_upstream(task1)

#新しい表記例(Airflow1.8以降)
task1 >> task2
#または
task2 << task1

4. DAGファイルの読み込み

f:id:livesense-analytics:20180201151427p:plain

f:id:livesense-analytics:20180201151537p:plain

ここまでの設定を dags_folder 上にあるファイルに保存したらDAGの設定は完了です。

記述したグラフに矛盾や間違いがなければAirflowが自動で読み込んでくれるはずです。

読み込まれない場合はschedulerを再起動してみましょう。*5

Catchupについて

DAG Run

DAG Run はDAGの一回分の実行を表現するメタデータ上の概念です。 DAGの実行をする際には、スケジュール実行する場合も、一回限りの即時実行をする場合も、DAG Runがメタデータ上に作成されます。 DAG Runはsuccess, failed, running等の実行状態を保持しています。

Catchup動作

scheduler は DAGの schedule_interval で設定されている時間になるとDAG Runを生成してDAGの処理を始めます。

この動作は一見何でもないように見えますが、実は、 scheduler は前回のDAG Runが存在するかどうかチェックしています。

前回分のDAG Runがあれば最新のDAG Runを一つ生成するだけですが、 前回分のDAG Runがない場合、履歴をたどっていって最も近い過去に実行されたDAG Runから現在時刻までの間のDAG Runをまとめて生成します。 過去に一度もDAG Runが生成されていない場合、DAGに定義されている start_date から現在時刻までのDAG Runsを一括生成します。

この動作を Airflow では Catchup と呼んでいます。

例えば、あるDAGを1日一回実行していましたが何らかの事情で1月3日分まででで止めてしまったとします。その後、2月1日にDAGの実行を再開すると、1月4日から2月1日の分のDAG Runが一気に生成されて順次実行されます。

f:id:livesense-analytics:20180131195311p:plain

DAGからexecution_dateを受け取って指定日1日分のデータを処理するようなフローの場合はこれで良いのですが、最新のデータだけを処理すれば良いフローの場合、DAG Runがたくさん生成されても困ってしまいます。

このような場合にはCatchupを無効にすることができます。方法は2つあります。

  1. airflow.cfg を書きかえてCatchupを無効にする

    全てのDAGでCatchupが無効になります

    設定するには、airflow.cfgを書き換えます

    [scheduler]

    catchup_by_default = True

  2. DAGの引数に指定する

    指定したDAG内だけでCatchupが無効になります

    catchup = True / catchup = False

実際にLAにAirflowを導入してみて

Airflowを導入して良かったところ

  • Airflow側で実行ログを保存してくれる トラブルの際にタスク実行サーバーにログインしてログを確認する必要がなくなりました

  • タスク毎の実行履歴が見られる 実行履歴からログが見られるので状況の把握や問題の原因究明がやりやすくなりました

  • 依存関係があるデータフロー処理タスクをDAGでコード記述できる 依存関係の記述がやりやすくなりました。 また、データ処理フローをコードで記述できるので、バージョン管理しやすくなりました

  • データフローをグラフィカルに確認できる 複雑なデータフローを記述する際に、依存関係に間違いがないか一目で確認できるようになりました

微妙だった所

  • DAGを消してもWebUIからdag_id名が消えない

    リネームしたDAG等が残ってしまいます。そういう仕様だと思って割り切って使っていますが、リネームする項目が増えた時にどうしようかと思ってしまいます

  • DAG導入のタイミングによっては、ExecutionDateとStartDateがズレてしまう

    残念ながら、この挙動については原因が追いきれてません

まとめ

今回はAirflow分散処理の概要についてご説明させて頂きました。以下にAirflowの最近の更新履歴概要の抄訳を付録としまして置きましたので最近の状況についてご確認いただければと思います。

付録:Airflow 最近の更新状況

Apache Airflowはこの1年で2回の大きなアップデート(version 1.9, version 1.8)を行っています。 以前とは設定項目などが色々変わっていますので、以前利用されていた方も最新の状況を確認されるのが良いと思います。

incubator-airflow/UPDATING.md at master · apache/incubator-airflow · GitHub

以下、抄訳です。

次期バージョン(予定)

Celeryコンフィギュレーションパラメータ名が変更になります

  • celeryd_concurrency から worker_concurrency に変更
  • celery_result_backend から result_backend に変更

GCP Dataflow Operators

  • Dataflow job labeling が Dataflow {Java,Python} Operator でサポートされます

Airflow 1.9

リリース 2017/12

SSH HookがサブプロセスベースのSSHからParamikoライブラリを使用したものに切替え

  • SSHExecuteOperatorは廃止になりました。SSHOperatorを使用して下さい
  • 併せて、SFTPOperatorが追加されました

airflow.hooks.S3_hook.S3Hookがboto2からBoto3を使うようになりました

  • s3_conn_idは受け付けなくなりました。aws_conn_idを使って下さい
  • デフォルトコネクションがs3_defaultからaws_defaultに変更

ログシステムの作り変え

  • pythonのloggingモジュールを使うように変更されました

  • 既存のLoggingMixinクラスを拡張することでロギングをカスタマイズできるようになりました

Dask Executor の導入

  • Dask分散クラスタでタスクを実行できるようになりました

Airflow 1.8.1

リリース 2017/5

パッケージ名がairflowからapache-airflowに変更されました

Airflow 1.8

リリース 2017/3

metadataデータベースの構造が変更されたので、バージョンアップするにはmetadataのアップグレードが必要になりました

  • metadataデータベースをアップグレードするには、airflow upgradedb を実行する必要があります

poolの管理がより厳密になりました

  • バージョン1.7.1では、許可されている以上の数のプールを取得できる問題がありましたが、今回のバージョンからできなくなりました

スケジューラーのオプションが新しくなりました

  • child_process_log_directory

    • 堅牢性を向上するために、DAGSの処理はスケジューラーとは別のプロセスを立てて実行するようになりましたのでそれぞれがログを出力します
    • それぞれのDAGS処理プロセスが、Airflow.cfg上で設定した child_process_log_directory 以下にログファイルを出力します
  • コマンドラインオプション num_runs の意味が変わりました

    • スケジューラーの最大リトライ回数 から run_duration時間内の最大リトライ回数に変更されました
  • min_file_process_interval

    • 省略
  • dag_dir_list_interval

    • 省略
  • catchup_by_default

    • 本ドキュメントで触れました

DAG処理中のエラーがWebUIに表示されなくなりました

  • DAG処理エラーは airflow.cfgで設定されるchild_process_log_directory 以下に出力されます

新しく登録したDAGはデフォルトでポーズ状態になります

  • Airflow.cfg 内に dags_are_paused_at_creation = False と記述することで以前のふるまいに戻すことができます。

コンテキスト変数が Hive config に渡せるようになりました

Google Cloud Operator と Hook が整理されました

deprecated扱いになり、将来的に削除される機能があります

  • HookとOperatorはairflowオブジェクトから直接参照できなくなります。サブモジュールからimportして下さい。
  • Operator._init_()は任意の引数を許容しなくなります
  • secure_modeはデフォルトでTrueになります

*1:(実際に分散処理を行う場合にはwebserverとschedulerで一台、workerだけのマシン複数台で運用すると思いますが)

*2:具体的なディストリビューション等についてはここでは言及しません

*3:当然ですが metadata database としてローカルマシンのデータベースを使用している場合 webserver と worker を別々のサーバーで動かすことは出来ません

*4:Airflowは分散マシン間のDAGファイルの同期について一切面倒を見てくれません。workerを複数動かす場合、バージョン管理ツールや構成管理ツール等を用いて、各workerサーバーのDAGファイルの中身をユーザーが同期しておく必要があります

*5:スケジューラーはairflow.cfgのmin_file_process_intervalに設定されている間隔で更新をしていますが、この設定値には下限があって、デフォルト値の0設定でも3分間隔になるようです。また、新しいファイルの監視の設定はdag_dir_list_intervalでこちらはデフォルト5分間隔で管理しているようです。