TheSchwartzもはじめました

GearmanとTheSchwartz

Gearmanに投げた仕事は無事完了したかどうかはよくわかりません。
途中でサーバーダウンしていた場合にどの処理が完了していて、どの処理が未完了なのか、わかりかねます。
TheSchwartzは引き受けた仕事を逐一DBに記録します。なので完了した処理と未完了の処理を区別する事ができます。

DBセットアップ

vi db/schwartz.sql
DROP TABLE IF EXISTS funcmap;
CREATE TABLE funcmap (
        funcid         INTEGER PRIMARY KEY AUTOINCREMENT,
        funcname       VARCHAR(255) NOT NULL,
        UNIQUE(funcname)
);

DROP TABLE IF EXISTS job;
CREATE TABLE job (
        jobid           INTEGER PRIMARY KEY AUTOINCREMENT,
        funcid          INTEGER UNSIGNED NOT NULL,
        arg             MEDIUMBLOB,
        uniqkey         VARCHAR(255) NULL,
        insert_time     INTEGER UNSIGNED,
        run_after       INTEGER UNSIGNED NOT NULL,
        grabbed_until   INTEGER UNSIGNED NOT NULL,
        priority        SMALLINT UNSIGNED,
        coalesce        VARCHAR(255),
        UNIQUE(funcid,uniqkey)
);

DROP TABLE IF EXISTS error;
CREATE TABLE error (
        error_time      INTEGER UNSIGNED NOT NULL,
        jobid           INTEGER NOT NULL,
        message         VARCHAR(255) NOT NULL,
        funcid          INT UNSIGNED NOT NULL DEFAULT 0
);

DROP TABLE IF EXISTS exitstatus;
CREATE TABLE exitstatus (
        jobid           INTEGER PRIMARY KEY NOT NULL,
        funcid          INT UNSIGNED NOT NULL DEFAULT 0,
        status          SMALLINT UNSIGNED,
        completion_time INTEGER UNSIGNED,
        delete_after    INTEGER UNSIGNED
);
sqlite db/schwartz.db < db/schwartz.sql

Jobを投げる

vi eg/client.pl
#!/usr/bin/perl
package main;
use strict;
use warnings;
use utf8;
use DBI;
use TheSchwartz::Simple;

my $dbh = DBI->connect("dbi:SQLite:dbname=db/schwartz.db", '', '', {RaiseError => 1}) or die $DBI::err;

### clientを準備
my $client = TheSchwartz::Simple->new([$dbh]);
my $job_id = $client->insert('MyWorker', {hoge=>'fuga'});

my @jobs = $client->list_jobs({funcname => 'MyWorker'});

for my $job (@jobs) {
    print $job->jobid, "\n";
}

$dbh->disconnect;

実行

:!perl eg/schwartz.pl
1

DB確認。jobが入っています。

sqlite3 db/schwartz.db
SQLite version 3.4.2
Enter ".help" for instructions
sqlite> select * from job;
1|1|||1276694254|1276694254|0||
sqlite>

Wokerを用意

vi eg/worker.pl
#!/usr/bin/perl
package MyWorker;
use base qw( TheSchwartz::Worker );

sub work {
    my $class = shift;
    my $job   = shift;

    my $arg = $job->arg;
    warn $arg->{hoge};

    $job->completed;
}

1;

package main;
use TheSchwartz;

my $client = TheSchwartz->new(databases => [ { dsn => "dbi:SQLite:dbname=db/schwartz.db", user=> '', pass=>'' } ]);
$client->set_prioritize(1);
$client->can_do('MyWorker');
$client->work_once;

実行

:!perl eg/schwartz_worker.pl
fuga at eg/schwartz_worker.pl line 10.

DB確認。jobテーブルのレコードが減り、funcmapにレコードが増えている。

sqlite> select * from job;
sqlite>
sqlite> select * from funcmap;
1|MyWorker