🐷

Cloudflare Workflows

2024/12/17に公開

この記事はCloudflare Advent Calender 2024年12月25日の記事です。

2024年4月の Developer Week で Workflowsという機能がアナウンスされ、2024年後半をターゲットしてリリースが予告されていました。
https://blog.cloudflare.com/data-anywhere-events-pipelines-durable-execution-workflows/#durable-execution
その後10月に無事ベータ版がリリースされたことを受けこの記事ではその機能を試していきます。
https://blog.cloudflare.com/building-workflows-durable-execution-on-workers/

Workers と Workflows

Cloudflare Workers はステートレスなサーバレス基盤です。CPU時間が無償版では10msと制限されており(fetch等による待ち時間は含まないため意外と10msとはいえ色んな処理が可能です)処理が終了するとプロセスは解放されるため、処理結果をもとにした条件分岐を伴う連続的な処理に使うためには少し工夫が必要でした。

Workers にステートを持たせる場合KVやD1等のデータベースを使いcron型Workersでポーリングを行うか、間にQueueを挟み、WorkersAの処理結果をもとにWorkersBを起動させるといった処理をつなぐ、等の工夫が必要でした。

今回リリースされたWorkflows は Workers 専用ステートマシーンです。これにより複数の処理をつないだり条件分岐による処理等を雑な処理をある程度モノリシックに開発可能です。

さっそくやってみる

まずはその動きを理解するために、可能な限りシンプルなサンプルを作っていきます。まずは以下のコマンドでサンプルプロジェクトを起動します。

npm create cloudflare@latest workflows -- --template "cloudflare/workflows-starter"

--が2つ重なっているのは意図的です。通常のworkers プロジェクト起動と流れは同じですが、WorkflowsはTypeScript限定となっています。通常ブログ執筆の際はなるべく素の状態で手順をまとめるようにしているためJavaScriptを用いていますが、今回はTypeScriptで作業を進めます。

デフォルトで作成されるサンプルは少し冗長でわかりづらいのシンプルにした以下でindex.tsを置き換えます。

index.ts
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers';

interface Env {}
type Params = {};

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
	async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
		let state = step.do('my first step', async () => {
			return [1, 2, 3];
		});

		step.do('my second step', async () => {
			for (let data in state) {
				// Do something with your state
			}
		});
	}
}

export default {
	async fetch(req: Request, env: Env): Promise<Response> {
		let url = new URL(req.url);

		if (url.pathname.startsWith('/favicon')) {
			return Response.json({}, { status: 404 });
		}

		let id = url.searchParams.get('instanceId');
		if (id) {
			let instance = await env.MY_WORKFLOW.get(id);
			return Response.json({
				status: await instance.status(),
			});
		}

		let instance = await env.MY_WORKFLOW.create();
		return Response.json({
			id: instance.id,
			details: await instance.status(),
		});
	},
};

設定ファイルであるwrangler.tomlも同じように以下に置き換えます。

name = "workflows"
main = "src/index.ts"
compatibility_date = "2024-10-22"

[observability]
enabled = true
head_sampling_rate = 1 # optional. default = 1.

[[workflows]]
name = "workflows"
binding = "MY_WORKFLOW"
class_name = "MyWorkflow"

Workersからアクセス可能な他のCloudflareサービスと同様にbinding = "MY_WORKFLOW"によってenvという環境変数からworkflowの呼び出しが可能となります。
env.MY_WORKFLOW.create()によりworkflowのインスタンスが起動されclassとしてアクセスが可能となります。その際のclass名はwrangler.tomlで指定の通りclass_name = "MyWorkflow"になります。
index.tslet instance = await env.MY_WORKFLOW.create();の行が

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
	async run(event: WorkflowEvent<Params>, step: WorkflowStep) {

を起動しています。これがworkflowの実態です。

Deploy と実行

DeployはWorkersと同じようにwrangler deployです。マネージメントコンソールにworkflowが1個追加されました。

ブラウザでアクセスすると以下が表示されます。

{"id":"ebaa9e33-8a45-4880-9b0c-e5d71c0b640e","details":{"status":"queued","error":null,"output":null}}

idはインスタンスid呼ばれworkflowの実行毎に付与されるidです。
コンソールでは以下の通りworkflowが1回実行されたことがわかります。

詳細を見ると2つのステップが実行されたことがわかります。

1つ目の処理が[1,2,3]という出力を出しておりそれが2つめの処理に組み込まれています。

runstep.do

前述の通りlet instance = await env.MY_WORKFLOW.get(id);によりworkflowは起動されclassが宣言されます。
一つ目の処理は以下の箇所が担っています。

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
	async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
		let state = step.do('my first step', async () => {
			return [1, 2, 3];
		});

2つ目の処理は以下です。同じコード内で呼ばれているためstateはそのまま認識しています。1つ目の処理で3個の戻り値を出力しているため、以下の2つ目の処理は3回実行されます。

		step.do('my second step', async () => {
			for (let data in state) {
				// Do something with your state
			}
		});

コードを以下に置換して再度Deploy&実行してみます。

index.ts
import { WorkflowEntrypoint, WorkflowEvent, WorkflowStep } from 'cloudflare:workers';

interface Env {}
type Params = {};

export class MyWorkflow extends WorkflowEntrypoint<Env, Params> {
	async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
		let state = step.do('my first step', async () => {
			// Example state data
			return [1, 2, 3];
		});

		step.do('my second step', async () => {
			// Iterate over the state array
			for (let data of await state) {
				// Send a POST request for each data item
				try {
					const response = await fetch('https://workflow.requestcatcher.com/', {
						method: 'POST',
						headers: {
							'Content-Type': 'application/json',
						},
						body: JSON.stringify({ data }),
					});

					// Handle the response if needed
					if (response.ok) {
						console.log('Request successful for data:', data);
					} else {
						console.log('Request failed for data:', data);
					}
				} catch (error) {
					console.error('Error while sending data:', data, error);
				}
			}
		});
	}
}

export default {
	async fetch(req: Request, env: Env): Promise<Response> {
		let url = new URL(req.url);

		// Handle favicon requests
		if (url.pathname.startsWith('/favicon')) {
			return Response.json({}, { status: 404 });
		}

		// Check for an instanceId and return its status
		let id = url.searchParams.get('instanceId');
		if (id) {
			let instance = await env.MY_WORKFLOW.get(id);
			return Response.json({
				status: await instance.status(),
			});
		}

		// Create a new workflow instance and return its status
		let instance = await env.MY_WORKFLOW.create();
		return Response.json({
			id: instance.id,
			details: await instance.status(),
		});
	},
};

https://requestcatcher.com/ で受け口を作っておくと、
body: JSON.stringify({ data }),が3回POSTされ出力していることがわかります。
コンソールからは以下のようにメトリクスが取得可能です。

CLI経由では以下のコマンドで起動が可能です。

wrangler workflows trigger workflows
Delegating to locally-installed [email protected] over global [email protected]...
Run `npx wrangler workflows trigger workflows` to use the local version directly.


 ⛅️ wrangler 3.95.0 (update available 3.96.0)
-------------------------------------------------------

▲ [WARNING] 🚧 `wrangler workflows trigger` is an open-beta command. Please report any issues to https://github.com/cloudflare/workers-sdk/issues/new/choose


🚀 Workflow instance "b735c04f-0f79-43fb-be01-1f6850f6a734" has been queued successfully

Discussion