AnyEvent + Coro での並行ダウンローダの習作

AnyEvent と Coro を使って、WEB から非同期に並行ダウンロードするプログラムの習作を作ってみた。http://kaede.to/~canada/doc/wiser-clawer-sample-using-coroをだいぶ参考にさせてもらっている。紹介されている非同期クローラはとてもいいのだが、自分がやろうとしていることを考えると、次の点がちょっと気になった。

  • 対象 URI のリストは一千万オーダになる
    • あらかじめメモリ上に列挙しておいていいのだろうか
    • URI の数分のスレッドを起こしていいのだろうか
  • 名前解決すると同一 IP アドレスになるホストが少なくない
    • 同時リクエスト制限をかけるのはホスト名ではなく IP アドレスがいいのではないだろうか

ということで、次のような特徴を持つスクリプトを書いてみた。

  • 最大同時接続数の制御にはセマフォを使うのではなく、その数 (20 なら 20 個) 分のワーカースレッドを作る
    • ワーカースレッドは URI を受け取ってダウンロードしてくる、というのを延々繰り返す
  • ただひたすら無限に URI を生成する URI 生成スレッドを作る
    • 実際には DB やファイル、検索エンジンなど、他のソースから URI を持ってくることを想定
  • URI 生成スレッドとワーカーの間の URI のやりとりは Coro::Channel を利用
  • 同一 IP アドレスへの同時リクエスト数を制限する
    • AnyEvent::HTTP::http_get ではホスト名だけでしか同時リクエスト制限できない
    • AnyEvent::DNS を利用して非同期に名前解決する
    • 同一 IP アドレスへの同時リクエスト制限にはセマフォを用いる
use strict;
use warnings;
use AnyEvent;
use AnyEvent::DNS;
use AnyEvent::HTTP;
use Coro;
use Coro::Channel;
use Coro::Semaphore;
use URI;

my $uri_channel = new Coro::Channel 5;
my %semaphore_for = ();

for my $i (0..19) {
    my $worker_t = async {
        while (1) {
            my $uri = $uri_channel->get();
            my $host = URI->new($uri)->host;
            AnyEvent::DNS::a $host, Coro::rouse_cb;
            my $ip_addr = Coro::rouse_wait;
            if (! defined $semaphore_for{$ip_addr}) {
                $semaphore_for{$ip_addr} = new Coro::Semaphore 4;
            }
            my $guard = $semaphore_for{$ip_addr}->guard;
            print "getting $i $uri ...\n";
            http_get $uri, Coro::rouse_cb;
            my @results = Coro::rouse_wait;
            print "done $i $uri.\n";
        }
    };
}

my $producer_t = async {
    my $base_uri = q{http://www.example.com/};
    my $q = 0;
    while (1) {
        $uri_channel->put("$base_uri?q=$q");
        $q++;
    }
};

$producer_t->join;

URI 生成スレッド ($producer_t) は無限ループで、http://www.example.com/?q=0, http://www.example.com/?q=1, http://www.example.com/?q=2 ... という URI をチャネル $uri_channel に押し込んでいるだけ。$uri_channel はサイズ 5 にしているが、チャネルの producer が一つなのに対し、consumer が 20 個とかになるので、もっと大きい値にしたほうが効率がいいだろう。今は習作で単純に URI を生成しているが、実際には URI の生成にもっとコストがかかるだろう。その生成コストとの兼ね合いも考慮しなければならない。

ワーカースレッド ($worker_t) は 20 本作られる。$uri_channel から URI を一つもらってきては HTTP リクエストを投げる、という無限ループを走る。チャネルからの get() 時、DNS の名前解決と HTTP リクエストの 2 箇所の rouse_wait 実行時、guard オブジェクトの取得時、に他のスレッドに制御が移る可能性がある。

URI 生成スレッドだけを join で待ち受けているが、このスクリプトではどのスレッドも無限ループするので強制終了しないと終わらない。きれいに終わらせるには、while (1) で無限ループしている箇所を while ($keep) とかにして、どこかで $keep フラグを落とすようにすればいいかもしれない。

実行結果は次のようになった。

$ perl async.pl
getting 0 http://www.example.com/?q=0 ...
getting 1 http://www.example.com/?q=1 ...
getting 2 http://www.example.com/?q=2 ...
getting 3 http://www.example.com/?q=3 ...
done 0 http://www.example.com/?q=0.
getting 4 http://www.example.com/?q=4 ...
done 1 http://www.example.com/?q=1.
getting 5 http://www.example.com/?q=10 ...
done 2 http://www.example.com/?q=2.
getting 6 http://www.example.com/?q=11 ...
done 3 http://www.example.com/?q=3.
getting 7 http://www.example.com/?q=12 ...
done 5 http://www.example.com/?q=10.
...

追記

セマフォの数が接続したことのある IP アドレスの数と同じだけ必要なのがいただけない。他の部分はデータ量によらずメモリ消費は一定なのに、セマフォはデータ量に比例してメモリを食ってしまう。同時には、たかだか 20 個しか並列で動かないので、現在接続中の IP アドレスと比べて接続するか否かを決定するようにすればいいのかもしれない。