conditions precedent:
Points:
package OreOre::Worker::Base;
use strict;
use warnings;
use parent qw/Class::Accessor::Fast/;
use Try::Tiny;
use Parallel::Prefork;
use Log::Minimal;
use Carp ();
__PACKAGE__->mk_accessors(qw/max_workers job_count max_job_count connect_info timeout/);
sub new {
my $class = shift;
my %args = @_ == 1 ? %{$_[0]} : @_;
my $self = bless {
max_workers => 2,
job_count => 0,
max_job_count => 20,
timeout => 60,
%args
}, $class;
Carp::croak("missing mandatory args: connect_info") unless $self->{connect_info};
return $self;
}
sub table { die "abstract base method. please implement this method in child class" }
# this method must return boolean value.
# true if suceeded, false otherwise.
sub process { die "abstract base method. please implement this method in child class" }
sub run {
my $self = shift;
# time is not required, because I use multilog
# print pid for debugging
local $Log::Minimal::PRINT = sub {
my ( $time, $type, $message, $trace ) = @_;
print STDERR "[$$] [$type] $message at $trace\n";
};
infof("start: $$");
my $pm = Parallel::Prefork->new(
{
max_workers => $self->max_workers,
trap_signals => {
TERM => 'TERM',
HUP => 'TERM',
INT => 'TERM',
USR1 => undef,
}
}
);
while ($pm->signal_received !~ /^(?:TERM|INT)$/) {
$pm->start and next;
debugf($self->job_count.'/'.$self->max_job_count);
my $dbh = DBI->connect(@{$self->{connect_info}}) or die "cannot connect to q4m";
my $term = 0;
LOOP: while (!$term && $self->max_job_count >= $self->{job_count}++) {
infof("waiting...");
my $got_job = $dbh->selectrow_array('SELECT queue_wait(?, ?);', {}, $self->table, $self->timeout);
unless ($got_job) {
debugf("no job...");
next LOOP;
};
infof("got job");
try {
local $SIG{TERM} = sub { infof("TERM RECEIVED : "); $term++ };
my $retval = $self->process($dbh);
if ($retval) {
infof("finished");
$dbh->do(q{SELECT queue_end();});
} else {
infof("failed");
$dbh->do(q{SELECT queue_abort();});
}
} catch {
critf("error occured: $_");
$dbh->do(q{SELECT queue_abort();});
};
}
infof("ready to die: $self->{job_count}, $self->{max_job_count}, $term");
$pm->finish;
}
infof("parent is ready for exit");
$pm->wait_all_children();
infof("ok, I'll die!");
}
1;
__END__
package MyWorker;
use base qw/OreOre::Worker::Base/;
sub table { 'foo' }
sub process {
my ($self, $dbh) = @_;
my ($col1, $col2) = $dbh->selectrow_array('SELECT * FROM foo');
...
}
package main;
use DBI;
my $worker = MyWorker->new(
max_workers => 3,
connect_info => ['dbi:mysql:database=queue', 'root', ''],
timeout => 30,
);
$worker->run();