AnyEvent + Coro での並行ダウンローダの習作 その 2

AnyEvent + Coro での並行ダウンローダの習作 - 昨日知ったことの続き。

前回のスクリプトでは、処理が進むにつれセマフォの数が増えていってしまうことが問題だった。セマフォのカウンタが元に戻ったら、そのセマフォを delete できれば一定数以上セマフォが増えないのだが、そのようなスクリプトは書けないし、書けたとしてもセマフォ生成のコストはかかってしまう。

AnyEvent::HTTP での同時接続数の制限

AnyEvent::HTTP で同一ホストに対する同時接続数を制限しているのだから、それを参考にすればいい、と早速ソースを覗いてみた。どうやら _slot_schedule という関数でその辺のことをやっているっぽい。

sub _slot_schedule;
sub _slot_schedule($) {
   my $host = shift;

   while ($CO_SLOT{$host}[0] < $MAX_PER_HOST) {
      if (my $cb = shift @{ $CO_SLOT{$host}[1] }) {
         # somebody wants that slot                                             
         ++$CO_SLOT{$host}[0];
         ++$ACTIVE;

         $cb->(AnyEvent::Util::guard {
            --$ACTIVE;
            --$CO_SLOT{$host}[0];
            _slot_schedule $host;
         });
      } else {
         # nobody wants the slot, maybe we can forget about it                  
         delete $CO_SLOT{$host} unless $CO_SLOT{$host}[0];
         last;
      }
   }
}

同時接続数の制限には、セマフォを用いるのではなく、単にカウンタを用いているようだ、というところまではわかったが、それ以上は追えていない。

producer による URI の分配

ここで一つ発想を変えてみた。もし、同じ IP アドレスの URI が必ず同じワーカースレッドに渡るならば、一つのワーカースレッドは同時に一つのリクエストしか投げられないので、自動的に同一 IP アドレスへの同時接続数はたかだか 1 になる。そうするためには、それぞれのワーカースレッドごとにチャネルを用意し、producer が IP アドレスを見て振り分けるようにすればよい。振り分け方が偏ると仕事が山積している (チャネルが詰まっている) スレッドと、仕事を待っている (チャネルが空の) スレッドができてしまい、効率がよくない。したがってできるだけ均等にばらけるよう、IP アドレスのハッシュ値で振り分けるのがよいだろう。

同一 IP アドレスへの同時接続数が 4 ならば、一つのチャネルに四つのワーカースレッドがはりついていいことになる。チャネルを 4 個にすれば 16 個のワーカースレッドができ (すなわち、同時接続数が 16)、チャネルを 5 個にすれば 20 個のワーカースレッドができる。
とりあえず、実装が簡単なチャネルが 4 個の場合のスクリプトを書いてみた。

use strict;
use warnings;
use AnyEvent;
use AnyEvent::DNS;
use AnyEvent::HTTP;
use Coro;
use Coro::Channel;
use URI;
use Digest::SHA1 qw/sha1_hex/;

my @uri_channels = map { new Coro::Channel 5 } 0..3;
my %index = map { sprintf("%x",$_) => int($_/4) } 0..15;

for my $index (0..3) {
    for my $sub_index (0..3) {
        my $worker_t = async {
	    $Coro::current->desc("worker $index-$sub_index");
            while (1) {
                my $uri = $uri_channels[$index]->get();

                print "getting $index-$sub_index $uri ...\n";
                http_get $uri, Coro::rouse_cb;
                my @results = Coro::rouse_wait;
                print "done $index-$sub_index $uri.\n";
            }
        };
    }
}

my $producer_t = async {
    $Coro::current->desc('producer');
    my $base_uri = q{http://www.example.com/};
    my $q = 0;
    while (1) {
        my $uri = "$base_uri?q=$q";
        my $host = URI->new($uri)->host;
        AnyEvent::DNS::a $host, Coro::rouse_cb;
        my $ip_addr = Coro::rouse_wait;
        my $index = $index{substr(sha1_hex($ip_addr), 0, 1)};
        $uri_channels[$index]->put($uri);
        $q++;
    }
};

$producer_t->join;

追記 2010-01-15

各スレッドに description をつけるようにした。