概要
Perlの非同期処理システムとしてTheSchwartz / Gearmanを非常に便利に使っているのですが、いい感じにワーカープロセスを管理したかったので、WorkerManagerなるものを作ってみました。作ったのは実は1年以上前なのですが、ようやくの公開です。
基本は、Apacheのpreforkモデルを踏襲しています。特徴は、
- 子プロセスを複数起動させて、ジョブを並列実行
- 一定回数のジョブを実行したら、子プロセスを再作成
- クラスを定義するのみでジョブを実行
- TheSchwartz / Gearmanの切り替えを最低限の変更で実現
- ワーカープロセスの安全な停止(プロセス停止の際に、実行中のジョブの終了後に終了)
- ジョブの実行時のタイムスタンプをジョブ投入からの遅延をログに保存
というあたりです。preforkモデルなので、あまりモダンな設計ではないですが、gdbでのデバッグの容易さなど枯れたところがいいですね。
一通り必要な機能は実装できており、はてなブックマークを始めとするはてなの非同期処理において(1年以上)安定運用されています。
ワーカーの実行
ワーカープロセスはworkermanager.plで実装されています。コマンドラインオプションでプロセス数とか、1プロセスあたりのジョブ数などを指定できます。
% ./workermanager.pl -h usage: workermanager.pl [-hdn] [-c concurrency] [-w works_per_child] -f conf_file -h : this (help) message -d : debug -n : prevent deamonize (non fork) -c : the number of concurrency (default 4). -w : the number of works per child process (default 100). -f : YAML-formated file of configuration
デーモン化せずにデバッグすることもできます。RedHat系OS向けにinitファイルも用意していますので、それを利用してもOKです。
% ./workermanager.pl -c 1 -d -n
設定ファイル
設定ファイルは以下のように記述します。見れば分かると思いますが、ワーカークラスやパス、PIDファイルやログの出力先を指定します。
workers: - Sandbox::Worker::A - Sandbox::Worker::B worker_options: prioritize: 1 inc_path: - examples/lib pidfile: examples/run/workermanager.pid logfile: examples/log/workermanager.log errorlogfile: examples/log/workermanager_error.log
ログ
ジョブの実行ごとにログが出力されます。processはジョブの処理時間、delayは投入されてから実行が開始されるまでの遅延時間です。サーバ台数やプロセス数は、このあたりを参照しながら調整します。それぞれのジョブの処理時間や遅延時間が記録されるのは、WorkerManagerの大きな利点の一つです。
2009-10-21T22:21:40 TheSchwartz job completed Sandbox::Worker::A process:2346 delay:155
サンプルコード
クライアントとワーカーは以下のように書きます。もともとTheSchwartzもシンプルに書けますので、ここではあまり差はありません。examplesにサンプルコードがあるので、それも参照してみてください。
クライアント
TheSchwartzの場合はinsertでジョブを投入します。run_afterやpriorityを指定することもできます。
use WorkerManager::Client::TheSchwartz; use Time::Piece; my $client = WorkerManager::Client::TheSchwartz->new(); $client->insert('Sandbox::Worker::A' => +{foo => localtime->epoch}); $client->insert('Sandbox::Worker::B' => +{foo => localtime->epoch}, {run_after => time + 10, priority => 1});
ワーカー
workメソッドにジョブの内容を書いておきます。
package Sandbox::Worker::A; use strict; use warnings; use base qw( TheSchwartz::Worker ); use TheSchwartz::Job; sub work { my $class = shift; my TheSchwartz::Job $job = shift; my $try = int(rand(10)); if($try > 5){ print "Processed 'A' ".$job->arg->{foo}."\n"; $job->completed(); } else { print "Failed 'A' ".$job->arg->{foo}."\n"; $job->failed("Failed 'A' ".$job->arg->{foo}); } } sub max_retries { 3 } sub retry_delay { 20 } 1;
まとめ
SoftwareDesignの11月号にもうすこし詳しく書いたので、よければ参照してみてください。