データプラットフォームグループの野本です。主に機械学習基盤の構築やそれにまつわるアプリケーションの開発をしています。 以前までの記事で現在 Kubernetes を利用して機械学習基盤の構築を進めているという紹介をしましたが、機械学習システムに付きものだと思われるワークフローのジョブ管理に Argo という Kubernetes 上で動作するワークフローエンジンを導入し使いはじめまてみました。まだ色々試している段階でもあるのですが現状でどんな感じで使っているのか紹介してみようと思います。
ワークフローエンジンの選定に関して
現在機械学習基盤では先に紹介した以前の記事 や マルチコンテナ構成による機械学習アルゴリズムとアプリケーションの疎結合化 のような形で機械学習システムの構築を進めています。特に後者の具体例のように各アプリケーションを疎結合にうまく動かせるように出来るのが理想です。 これらを踏まえ以下のような点を重視してワークフローエンジンを検討しました。
コンテナによる実行が手軽に出来る
- 各ジョブで利用する言語をジョブに適したものだったり担当者が得意なものに柔軟に変えることが出来る
- 複数のシステムでコアとなるコンテナをライブラリ的に利用したい
ワークフローを宣言的に記述出来る
- ワークフローの定義をファイルとしてバージョン管理したい
- UI での設定よりも再現性がある
- GitOps のような自動化を導入しやすい
出来るだけ手軽に導入 / 運用出来る
- 依存するミドルウェアなどが極力少ない
- 学習コストが低い
人員的なリソースや基盤構築のフェーズにもよると思いますが、現状は以上のようなことを念頭にワークフローエンジンの導入を考えていたところ、Argo - The Workflow Engine for Kubernetes というプロダクトと遭遇しました。
Argo とは
Argo とは Kubernetes 上で動作するコンテナネイティブなワークフローエンジンです。Kubernetes の CRD (Custome Resource Definition) として実装されており、標準の Deployment などの Kubernetes リソースと同等に yaml ファイルなどでワークフローの定義を管理出来るようになっています。
コンテナネイティブなワークフローエンジン
- 各ジョブは Pod により実行される
- ジョブ内 sidecar コンテナを利用可能
- Kubernetes の他のリソースと同様に ConfigMap や Volume を参照可能
- ダッシュボード用の argo-ui という Deployment の存在
- 現状は実行済みワークフローの参照のみ
Kubernetes の CRD として実装されている
- インストールされるのは workflow-controller と argo-ui の Deployment と argo-ui 用の Service、ConfigMap / Secrets で他のミドルウェアなどは必要としない
- kubectl コマンドで他のリソースと同等に扱える
yaml によるワークフローの定義
- 各ジョブはKubernetes 標準の Pod の Spec とほぼ同等の記述で設定出来る
- 並列処理などかなり柔軟なワークフローの設定が可能
選定基準のところと対になるような記述となりましたが、Argo はちょうど今求めているようなワークフローエンジンであり、より手軽にコンテナベースのワークフローを構築出来そうだと判断し導入を進めることとしました。
ちなみにデータ分析基盤チームの方では AirFlow を本格的に利用しています
導入してみた
現在は主に機械学習アプリケーションのワークフローとして利用しています。一例として以下のようなケースで利用しています。
- Python によりデータ分析基盤からデータを取得し後続のアルゴリズムによる計算のタスクに渡す形に整形
- R によるアルゴリズムで計算し結果を出力
- Python により出力結果を実データに変換しデータベースに登録
これら各タスクは全て独立したコンテナで動作しているため、共通の Volume をマウントすることにより各タスク間のデータの受け渡しを行っています。
また、このワークフローを定期実行するために Kubernetes の CronJob を利用しています。現状の Argo では単独で定期実行などのイベントによるトリガーが実装されていません。なので、CronJob による Job で argo submit
を実行することにより定期実行を行っています。失敗時などの再実行は Kubernetes v1.10 より実装された kubectl create job <Job Name> --from=cronjob/<CronJob Name>
にて手軽に実行出来ます。
どのように運用しているか
実際にどう設定してどのように動かしているのか、簡単なサンプルをベースに説明してみます。
ワークフローの設定
以下のようなフローを構築してみます。
- Python 可変長の値を生成 (コンテナイメージ:
task-python:1.0
) - R により 1 で生成した数値を処理して出力 (コンテナイメージ:
task-r:1.0
) - Python により 2 で出力した数値を処理する (コンテナイメージ:
task-python:1.0
)
また、2, 3 は 1 で生成した数値の個数分並列に実行させてみます。 Argo の設定ファイルは以下のようになります。
apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: generateName: workflow-sample- spec: entrypoint: workflow-sample-steps arguments: parameters: - name: max value: 5 templates: - name: workflow-sample-steps steps: - - name: task1 template: task-python-pre arguments: parameters: - name: max value: "{{workflow.parameters.max}}" - - name: task2-3 template: nested-steps arguments: parameters: - name: result1 value: "{{item}}" withParam: "{{steps.task1.outputs.parameters.results}}" - name: nested-steps inputs: parameters: - name: result1 steps: - - name: task2 template: task-r arguments: parameters: - name: param1 value: "{{inputs.parameters.result1}}" - - name: task3 template: task-python-post arguments: parameters: - name: param1 value: "{{steps.task2.outputs.parameters.result}}" - name: task-python-pre inputs: parameters: - name: max container: image: task-python:1.0 command: [sh, -c] args: ["python pre-task.py {{inputs.parameters.max}} > /tmp/results"] outputs: parameters: - name: results valueFrom: path: /tmp/results - name: task-r inputs: parameters: - name: param1 container: image: task-r:1.0 command: [sh, -c] args: ["Rscript task.R {{inputs.parameters.param1}} > /tmp/result"] outputs: parameters: - name: result valueFrom: path: /tmp/result - name: task-python-post inputs: parameters: - name: param1 container: image: task-python:1.0 command: [sh, -c] args: ["python post-task.py {{inputs.parameters.param1}}"]
設定ファイルの解説
- ワークフロー / コンテナを
templates
として定義する- ワークフローテンプレート (
step
)name: workflow-sample-steps
name: nested-steps
- 実行コンテナテンプレート (
container
)name: task-python-pre
name: task-r
name: task-python-post
- ワークフローテンプレート (
step
は入れ子で定義出来る- ここでは
task1
で複数の数値を生成、以降のタスクを並列に実行させるため入れ子のstep
を定義している
- ここでは
- 各タスクの入出力はファイル経由で行っている
task1
の出力は[1,2,3,4]
のような JSON の配列になっていて、withParam
により配列をパースし以降の処理に 1 つずつ渡すことで並列処理を行うようになっているspec
直下のarguments
に設定しているグローバルなパラメータ(ここではmax
)はargo submit workflow.yml -p max=30
のように実行時に値を上書き出来る
各ジョブの実際の内容は DB へのアクセスやデータ分析基盤である RedShift へのアクセス、GCS へのバックアップなどの処理がありますが、ワークフローとしては先程上げた導入例とほぼ同じような流れとなっています。
上記ワークフローを実際に実行した際のダッシュボードは以下のようになっています。
各ジョブがどのように実行されどのくらい時間がかかったか、その際のログ出力などが確認出来ます。
CronJob の設定
CronJob でワークフローを定期実行するにあたり、以下のような運用を行っています。
Argo クライアント
Kubernetes クラスタ内から argo
コマンドを実行するために Argo クライアント専用のコンテナを作成しています。複数のアプリケーションで Argo を利用することになりますが、Argo 本体とのバージョンの整合性を保ちやすくするなどの理由で共通のコンテナとしています。
ワークフロー定義ファイル
先程設定したワークフロー定義の yaml ファイルはジョブのコンテナに含める運用にしています。Kubernetes の ConfigMap に設定したり GCS に置いておくなどの方法もありそうですがジョブのコンテナ (= ジョブと同一のリポジトリ) に配置しておくことでジョブのロジックと一緒に管理出来るの方が今のところ扱いやすいためこのような方法を取っています。
今回はワークフローの定義(workflow.yml
)は Python のコンテナに同梱するような構成で進めてみます。
これらを踏まえ CronJob の設定は以下のようになります。
apiVersion: batch/v1beta1 kind: CronJob metadata: name: argo-cronjob-sample spec: schedule: "0 * * * *" jobTemplate: spec: backoffLimit: 0 template: spec: restartPolicy: Never volumes: - name: workdir emptyDir: {} initContainers: - name: init-workflow image: task-python:1.0 command: ["sh", "-c"] args: ["cp /workflow.yml /mnt/work/workflow.yml"] volumeMounts: - name: workdir mountPath: /mnt/work securityContext: runAsUser: 0 containers: - name: exec-workflow image: argo-cli:1.0 command: ["sh", "-c"] args: ["argo submit /mnt/work/workflow.yml"] volumeMounts: - name: workdir mountPath: /mnt/work
CronJob 実行時の処理の流れは
initContainers
でtask-python
コンテナにあるworkflow.yml
を取り出しマウントした volume に配置containers
によるメイン処理にて 1 で展開した定義ファイルを引数にargo submit
となっています。改めて workflow.yml
をどこで扱うかは悩ましいのですがよりアプリケーションに近いところで管理しておくことでワークフローと処理内容を一緒に参照出来る形にしています。
まとめ
このように Argo は当初から欲しかった
- コンテナによる実行が手軽に出来る
- Kubernetes ベースの実装のため Kubernetes のリソースとの親和性も高い
- ワークフローを宣言的に記述出来る
- 出来るだけ手軽に導入 / 運用出来る
という部分の要求は十分に満たしています。また現時点で使っている機能はほんの一部で、 argo リポジトリの example にあるようにかなり多様で柔軟なワークフローを構築出来る点も魅力です。
反対に、使ってみて現時点での課題と思うところは以下です。
- 柔軟で複雑な処理が書ける反面 yaml も複雑になりがち
- 今回は使っていないが
dag
での実行も実装されているのでこれを利用すればより簡潔 / 柔軟に記述出来そう
- 今回は使っていないが
- ダッシュボード (Argo UI)
- 現状実行済みワークフローの参照程度。定義済みのワークフローの表示や再実行など出来るようになると良さそう
- 実行済みワークフローが沢山あると描画がめちゃくちゃ重い
- 実行済みワークフローの削除が自動で行われず自前で管理しなければならない
- CronJob の 実行済み Job のように古いものは自動で消えて欲しい気もする
- やはりスケジュール実行 / イベントトリガーが欲しい
スケジュール実行やイベントトリガーは Argo Events というプロジェクトが開発中で Calendar(Cron) や Webhook の機能が見受けられます。 また、Argo CD / Argo CI というプロジェクトも進行中で、 Argo による手軽なワークフローを CI / CD に利用出来るのはかなり便利になるのではと思っています。