AnyEvent + Coro ã§ã®ä¸¦è¡ãã¦ã³ãã¼ãã®ç¿ä½ ãã® 2
AnyEvent + Coro での並行ダウンローダの習作 - 昨日知ったことã®ç¶ãã
ååã®ã¹ã¯ãªããã§ã¯ãå¦çãé²ãã«ã¤ãã»ããã©ã®æ°ãå¢ãã¦ãã£ã¦ãã¾ããã¨ãåé¡ã ã£ããã»ããã©ã®ã«ã¦ã³ã¿ãå ã«æ»ã£ããããã®ã»ããã©ã delete ã§ããã°ä¸å®æ°ä»¥ä¸ã»ããã©ãå¢ããªãã®ã ãããã®ãããªã¹ã¯ãªããã¯æ¸ããªãããæ¸ããã¨ãã¦ãã»ããã©çæã®ã³ã¹ãã¯ããã£ã¦ãã¾ãã
AnyEvent::HTTP ã§ã®åææ¥ç¶æ°ã®å¶é
AnyEvent::HTTP ã§åä¸ãã¹ãã«å¯¾ããåææ¥ç¶æ°ãå¶éãã¦ããã®ã ããããããåèã«ããã°ãããã¨æ©éã½ã¼ã¹ãè¦ãã¦ã¿ããã©ããã _slot_schedule ã¨ããé¢æ°ã§ãã®è¾ºã®ãã¨ããã£ã¦ããã£ã½ãã
sub _slot_schedule; sub _slot_schedule($) { my $host = shift; while ($CO_SLOT{$host}[0] < $MAX_PER_HOST) { if (my $cb = shift @{ $CO_SLOT{$host}[1] }) { # somebody wants that slot ++$CO_SLOT{$host}[0]; ++$ACTIVE; $cb->(AnyEvent::Util::guard { --$ACTIVE; --$CO_SLOT{$host}[0]; _slot_schedule $host; }); } else { # nobody wants the slot, maybe we can forget about it delete $CO_SLOT{$host} unless $CO_SLOT{$host}[0]; last; } } }
åææ¥ç¶æ°ã®å¶éã«ã¯ãã»ããã©ãç¨ããã®ã§ã¯ãªããåã«ã«ã¦ã³ã¿ãç¨ãã¦ããããã ãã¨ããã¨ããã¾ã§ã¯ããã£ããããã以ä¸ã¯è¿½ãã¦ããªãã
producer ã«ãã URI ã®åé
ããã§ä¸ã¤çºæ³ãå¤ãã¦ã¿ãããããåã IP ã¢ãã¬ã¹ã® URI ãå¿ ãåãã¯ã¼ã«ã¼ã¹ã¬ããã«æ¸¡ããªãã°ãä¸ã¤ã®ã¯ã¼ã«ã¼ã¹ã¬ããã¯åæã«ä¸ã¤ã®ãªã¯ã¨ã¹ãããæããããªãã®ã§ãèªåçã«åä¸ IP ã¢ãã¬ã¹ã¸ã®åææ¥ç¶æ°ã¯ããã ã 1 ã«ãªããããããããã«ã¯ãããããã®ã¯ã¼ã«ã¼ã¹ã¬ãããã¨ã«ãã£ãã«ãç¨æããproducer ã IP ã¢ãã¬ã¹ãè¦ã¦æ¯ãåããããã«ããã°ãããæ¯ãåãæ¹ãåãã¨ä»äºãå±±ç©ãã¦ãã (ãã£ãã«ãè©°ã¾ã£ã¦ãã) ã¹ã¬ããã¨ãä»äºãå¾ ã£ã¦ãã (ãã£ãã«ã空ã®) ã¹ã¬ãããã§ãã¦ãã¾ããå¹çããããªãããããã£ã¦ã§ããã ãåçã«ã°ããããããIP ã¢ãã¬ã¹ã®ããã·ã¥å¤ã§æ¯ãåããã®ãããã ããã
åä¸ IP ã¢ãã¬ã¹ã¸ã®åææ¥ç¶æ°ã 4 ãªãã°ãä¸ã¤ã®ãã£ãã«ã«åã¤ã®ã¯ã¼ã«ã¼ã¹ã¬ãããã¯ãã¤ãã¦ãããã¨ã«ãªãããã£ãã«ã 4 åã«ããã° 16 åã®ã¯ã¼ã«ã¼ã¹ã¬ãããã§ã (ããªãã¡ãåææ¥ç¶æ°ã 16)ããã£ãã«ã 5 åã«ããã° 20 åã®ã¯ã¼ã«ã¼ã¹ã¬ãããã§ããã
ã¨ãããããå®è£
ãç°¡åãªãã£ãã«ã 4 åã®å ´åã®ã¹ã¯ãªãããæ¸ãã¦ã¿ãã
use strict; use warnings; use AnyEvent; use AnyEvent::DNS; use AnyEvent::HTTP; use Coro; use Coro::Channel; use URI; use Digest::SHA1 qw/sha1_hex/; my @uri_channels = map { new Coro::Channel 5 } 0..3; my %index = map { sprintf("%x",$_) => int($_/4) } 0..15; for my $index (0..3) { for my $sub_index (0..3) { my $worker_t = async { $Coro::current->desc("worker $index-$sub_index"); while (1) { my $uri = $uri_channels[$index]->get(); print "getting $index-$sub_index $uri ...\n"; http_get $uri, Coro::rouse_cb; my @results = Coro::rouse_wait; print "done $index-$sub_index $uri.\n"; } }; } } my $producer_t = async { $Coro::current->desc('producer'); my $base_uri = q{http://www.example.com/}; my $q = 0; while (1) { my $uri = "$base_uri?q=$q"; my $host = URI->new($uri)->host; AnyEvent::DNS::a $host, Coro::rouse_cb; my $ip_addr = Coro::rouse_wait; my $index = $index{substr(sha1_hex($ip_addr), 0, 1)}; $uri_channels[$index]->put($uri); $q++; } }; $producer_t->join;
è¿½è¨ 2010-01-15
åã¹ã¬ããã« description ãã¤ããããã«ããã