Vertex AI Pipelines で利用している Kubeflow Pipelines を v2 へ移行しました

こんにちは,コミューンで機械学習エンジニアとして働いている柏木(@asteriam_fp)です.

最近は,鬼滅の刃の柱稽古編が毎週日曜日に放送されていて,それを見て楽しんだ勢いで月曜日からの仕事も勤しんでいます!(無限城編も楽しみです)

はじめに

現在我々のチームでは,Google Cloud 上の Vertex AI Pipelines 上で機械学習パイプライン(バッチレコメンド等で利用中)を動かしています.その裏側では,Kubeflow Pipelines (KFP) を利用していますが,このライブラリ(フレームワーク)が2023年6月にバージョン2.0をリリースしました.

以前まで使用していたバージョン1.8は2024年12月にサポートが終了してしまいます.これに伴い我々のチームでもバージョン2.0への移行対応を実施したため,本ブログではその内容を紹介したいと思います.

https://cloud.google.com/vertex-ai/docs/supported-frameworks-list?hl=ja#pipelines

この辺りの話は,株式会社 Citadel AI の杉山さんが書かれたブログや第39回 MLOps 勉強会で登壇された際の資料がとても参考になります.

参考:


このタイミングで移行しようとした背景

移行対応の背景

改めて移行した背景について説明します.上記でも述べた通り2024年12月にはサポートが終了してしまうため,EOL 対応が必要になります.これも移行対応を進める背景の1つです.

また我々のチームでは,機械学習の新規機能の検証やサービス開発を現在進行形で進めており,その過程で Vertex AI Pipelines を利用したパイプライン開発をする可能性が高いです.新機能開発を優先して,パイプラインの数が増えていったタイミングで EOL 対応をするとなれば,そのタイミングでリソースに余裕がなかったり,時間に追われ,慌てて対応することでミスが発生すれば,精神的にも負担が大きくなります.さらに, 以前から見えていた課題も相まって,今回早めに移行を進めることにしました.

以前から見えていた課題

以前から見えていた課題として,機械学習パイプラインの実装負荷・学習コストの高さがありました.特に Vertex AI Pipelines の実装に慣れていないメンバーが開発しようとすると,例えばデータサイエンティストのメンバーの場合,本来注力したいロジック部分以外に注意を払う必要がある箇所が多く,Vertex AI Pipelines 上で動作の確認を取るまでのハードルが高いため実装時の負荷がそれなりにかかっている状態でした.

また,機械学習パイプラインにおけるコンポーネントの構成上の問題でもあるのですが,Dockerfile を1つのコンポーネントにつき1つ用意していたため,トータルでのビルド時間も長くなり動作確認を取るためのサイクルを回す時間がボトルネックになるといったケースもありました.

KFP v1 時のコンポーネント構成

機械学習パイプラインでは,複数のコンポーネントを繋ぎ合わせて構成されますが,そのコンポーネント1つ1つがコンテナとして動く仕組みを Vertex AI Pipelines では取っています.

我々のチームでは,元々以下のようなファイルをコンポーネント毎に用意していました.

.
├── component_1
│   ├── Dockerfile
│   ├── component.yaml
│   ├── main.py
│   ├── poetry.lock
│   └── pyproject.toml
├── component_2
└── component_3

このような構成を取っていたものの共通化できる部分があるのではないかと感じていました.

  • Dockerfile と poetry 関連のファイルの共通化
    • Dockerfile はコンポーネント毎で大部分が同一なため,分ける意味はほとんどなく,また個別にビルドすると時間がかかる状況でした
    • poetry 関連のファイルは各コンポーネントで必要最低限のライブラリをインストールできる点は良いのですが,コンポーネント間での一貫性を保つような形式を取っていなかったので,コンポーネントによっては同じライブラリでもバージョンが微妙に異なることが発生していました

これらを踏まえて KFP v2 へ移行するタイミングで共通化やビルド時間の長さを解決する方法を考え,それらも盛り込んで移行することにしました.

KFP v2 への移行

KFP v2 へ移行する上で公式ドキュメントとして提供されている Migrate from KFP SDK v1 を読み,それに従い進めて行きました.

我々の場合,上記ドキュメントで対応したもの以下になります.

Container Components を用いた記述

KFP v2 SDK ではコンポーネントの記述方法として以下の3つが用意されています.

個々の説明はここでは割愛するため,詳細は公式ドキュメントで確認して下さい.

我々は,今までの記述とあまり変わらない任意のコンテナを使用してコンポーネントを定義でき,柔軟性がある Container Components を採用しました.

v1 時代には component.yaml を記述し,それを kfp.components.load_component_from_file の関数を使ってコンポーネントを用意していました.以下が v1 時代に使用していた component.yaml とパイプラインをコンパイルするための python ファイルになります.

name: get-dataset

inputs:
  - name: tenant_id
    type: Integer

outputs:
  - name: dataset_output
    type: Dataset

implementation:
  container:
    image: asia-northeast1-docker.pkg.dev/{PROJECT_ID}/sample/get-dataset:{TAG}
    command: [
      poetry, run,
      python, main.py,
      --tenant_id, {inputValue: tenant_id},
      --dataset_output, {outputPath: dataset_output}
    ]
def load_component_from_file(yml_path: str) -> kfp.dsl.ContainerOp:
    """Load a component from a YAML file and customize its container image.
    Args:
        yml_path (str): The path to the YAML file containing the component definition.
    Returns:
        kfp.dsl.ContainerOp: The loaded component with the customized container image.
    """
    op = kfp.components.load_component_from_file(yml_path)
    template = op.component_spec.implementation.container.image
    op.component_spec.implementation.container.image = template.format(
        PROJECT_ID=PROJECT_ID,
        TAG=TAG,
    )

    return op

@dsl.pipeline(
    name="vertex-pipelines-sample",
    description="Vertex Piplines Sample"
)
def pipeline(tenant_id: int) -> None:
    """Run a pipeline to extract data and perform a task using the specified tenant_id.
    Args:
        tenant_id (int): The tenant_id used for extracting data.
    """
    # Get dataset from BigQuery
    get_dataset_op = load_component_from_file("./components/get_dataset/component.yaml")
    get_dataset_task = get_dataset_op(tenant_id=tenant_id)
    
    ... (以下略)

一方で,v2 ではコンポーネント化したい処理に対して @dsl.container_component のデコレータを付与することで簡単にコンポーネント化することができます.

コンポーネントの作り手は YAML ファイルを介さずにコンポーネントのイメージ・コマンド・引数を直接設定できるようになりました.

以下はサンプルとして,poetry を用いた仮想環境で python のプログラムを実行することを想定したものになります.(この処理の出力結果を後続のコンポーネントに渡す想定で,result を dsl.Output[dsl.Dataset] で Artifact として記述しています)

from kfp import dsl

@dsl.container_component
def get_dataset(tenant_id: int, result: dsl.Output[dsl.Dataset]) -> dsl.ContainerSpec:
    return dsl.ContainerSpec(
        image='asia-northeast1-docker.pkg.dev/{PROJECT_ID}/sample/ml-pipelines'
        command=["poetry", "run", "python", "get_dataset.py"],
        arguments=['--tenant_id', tenant_id, '--dataset_output', dataset_output.path])

これにより,先ほど紹介した実行方法や入出力に関する YAML ファイルである component.yaml を今まで別で管理していましたが,用意する必要が無くなり1,管理物も減らすことができました.

環境変数とマシンリソースの設定

@dsl.container_component のデコレータを使った Container Components は環境変数をサポートしていないため,.set_env_variable メソッドを使用して,パイプライン定義内のコンポーネントからインスタンス化されたタスクに1つずつ設定する必要があります.

@dsl.pipeline(name='my-pipeline')
def pipeline(tenant_id: int) -> None:
    get_data_task = get_data(tenant_id=tenant_id)
    get_data_task.set_env_variable("ENVIRONMENT", "dev")
  
    ...

また同様に,マシンリソースの設定においても適切なインスタンスを割り当てるために,メモリや CPU をコンポーネント毎に .set_cpu_limit, .set_memory_limit をする必要があり,インスタンスのスペック表と見比べながら設定するのはなかなか面倒な作業です.

v1 時代には,以下のように custom_job_spec から強引に設定していましたが,ContainerOp が v2 では削除された関係でこの方法は使えなくなりました.

get_dataset_op = load_component_from_file("./components/get_dataset/component.yaml")
get_dataset_task = get_dataset_op(tenant_id=tenant_id)
get_dataset_task.custom_job_spec = {
        "displayName":get_dataset_task.name,
        "jobSpec": {
            "workerPoolSpecs": [
                {
                    "containerSpec": {
                        "imageUri": get_dataset_task.container.image,
                        "command": get_dataset_task.command,
                        "args": get_dataset_task.arguments,
                        "env": [
                            {"name": "PROJECT_ID", "value": PROJECT_ID},
                            {"name": "ENVIRONMENT", "value": ENVIRONMENT}
                        ],
                    },
                    "machineSpec": {"machineType": "e2-highmem-2"},
                    "replicaCount": 1,
                }
            ],
        },
    }

上記の方法は v2 では使えないため,色々と調べていると,「Vertex AI Pipelines を使用して Google Cloud マシンリソースをリクエストする」の公式ドキュメントで google-cloud-pipeline-components というライブラリが紹介されており,これを使うと環境変数・マシンスペックの設定は簡単に設定できそうだとわかりました.

ただ,手元で利用している各種ライブラリのバージョンと相性が悪いのか,環境変数・マシンスペックの設定が正しくセットされませんでした.原因調査を一定行いましたが,すぐに解決することが難しかったため,今回この方法は諦めました.

タスク設定を行うクラスを自作して対応

import dataclasses
from kfp import dsl


@dataclasses.dataclass
class MachineResourceSpec:
    cpu: str
    memory: str


@dataclasses.dataclass
class ContainerTaskSpec:
    task: dsl.ContainerSpec
    machine_type: str


machine_catalog: dict[str, MachineResourceSpec] = {
    "e2-standard-2": MachineResourceSpec(cpu="2", memory="8G"),
    "e2-standard-4": MachineResourceSpec(cpu="4", memory="16G"),
    "e2-highmem-2": MachineResourceSpec(cpu="2", memory="16G"),
    "e2-highmem-4": MachineResourceSpec(cpu="4", memory="32G"),
}


class TaskConfigAssigner:
    def __init__(self, tasks: list[ContainerTaskSpec]):
        self.tasks = tasks

    def set_env_variables(self, envs: list[dict]):
        for task in self.tasks:
            for env in envs:
                for k, v in env.items():
                    task.task.set_env_variable(k, v)

    def set_machine_spec(self, machine_spec_table: dict):
        for task in self.tasks:
            if task.machine_type not in machine_spec_table:
                raise ValueError(f"Machine type {task.machine_type} not found in machine spec table.")
            machine_spec = machine_spec_table[task.machine_type]

            task.task.set_cpu_limit(machine_spec.cpu)
            task.task.set_memory_limit(machine_spec.memory)


envs = [{"PROJECT_ID": "my-project"}, {"ENVIRONMENT": "dev"}]

@dsl.pipeline(name='my-pipeline')
def pipeline(tenant_id: int) -> None:
    get_dataset_task = get_dataset(tenant_id=tenant_id)
  
    ...

    task_list = [
        ContainerTaskSpec(task=get_dataset_task, machine_type="e2-highmem-2"),
        ...
    ]

    tasks = TaskConfigAssigner(task_list)
    tasks.set_env_variables(envs=envs)
    tasks.set_machine_spec(machine_spec_table=machine_catalog)

ライブラリを使えるのがベストでしたが,困難だったため,今回は各々のタスク(コンポーネント)に対して,環境変数とタスクに適したマシンスペックを割り当てられる上記のようなクラスを自作することにしました(現時点ではよく使うものだけをメソッドとして用意しています).

Vertex AI Pipelines のマシン割り当てとして,set_cpu_limit, set_memory_limit で指定したマシンがそのまま設定されるわけではなく,指定したリソースに近いマシンインスタンスが割り当てられるため,料金表を基に適切なコストで決めたいという理由でマシンカタログを用意しています.

おわりに

最初に我々の課題として,機械学習パイプラインの実装負荷・学習コストの高さ・ビルド時間の長さを挙げましたが,今回の取り組みを通して徐々に改善されています.

  • 実装負荷・学習コストの高さ
    • KFP v2 への移行の恩恵として,今まではパイプラインの1コンポーネントあたり最低5つのファイルを用意する必要がありましたが,処理を行うファイル1つだけを用意すれば良くなったため,データサイエンティストが開発する際にはロジック部分だけに集中でき,負担が減るようになりました(実際にそういった声を貰いました).
  • ビルド時間の長さ
    • コンポーネントのディレクトリ構造を見直し,Dockerfile を共通利用にした & マルチステージビルドにしたことで,GitHub Actions 経由でのビルド時間は大体11分から5分まで50%以上短縮することができました!

移行の背景で説明したことに加えて,我々のチームでは,僕しか Vertex AI Pipelines やデプロイ周りのインフラなどをしっかり把握して扱える人が居ない状況でした.複数のプロジェクトが走っている中できめ細やかにサポートできないタイミングもあるため,僕がボトルネックとなって検証が進まない状況をなるべく避けたいという思いから,今回の移行を優先度高く対応しました.

小さなチームでも,我々のようにプロジェクト次第では機械学習モデルを作成する人とパイプライン周りをプロダクションに載せる人が分かれるケースもあるかと思います.この場合,完成させるためにどちらかに過度に依存した状態では(お互い忙しいため)停滞する可能性が高いです.そのため,接合点周りはお互いできるようにしておく,もしくは簡単にできる仕組みを用意することが大事だなと感じました.

今後は Python 3.9 からのバージョンアップ対応やローカルでのパイプラインテストができる仕組みなどを用意し,より開発者体験を向上すると共に,機械学習の価値をより素早くユーザーに提供できるように引き続きチャレンジしたいと考えています.

コミューンでは一緒に働く仲間を募集しています! 少しでもコミューンの開発組織や環境に興味がある方はカジュアル面談でお話ができますと幸いです!

docs.google.com


  1. KFP v2 では,下位互換性のために今まで YAML ファイルに書いていたコンポーネント定義や kfp.components.load_component_from_file 関数は引き続きサポートされるそうです(参考:https://www.kubeflow.org/docs/components/pipelines/v2/migration/#v1-component-yaml-support