å¤æ°ã®ãµã¼ãã使ç¨ãã¦ãè¨ç®éã®å¤ãå¦çãè¡ãå¿ è¦ãåºã¦ããã®ã§ãNet::Amazon::EC2ãNet::Amazon::S3ã«ããåæ£å¦çã試ãã¦ã¿ã¾ããã
ã·ã¹ãã æ§æã¯æ¬¡ã®éãã§ãmap reduceã®ãããªåæ£å¦çã¯è¡ãããåç´ã«è¤æ°ã®ãµã¼ããèµ·åãã¦ãParallel::ForkManagerã«ãã並åå¦çãè¡ãã¾ãã
scriptã¯æ¬¡ã®ã¨ããã§ã
Net::Amazon::EC2ã§ãec2ãè¤æ°èµ·åãã
Parallel::ForkManagerã§ãåæ£å¦çã
â»ãã ããèµ·åããec2ã¸ã®ã³ãã³ãéä¿¡ã¯ãNet::SSH::Perl ã使ç¨ã
â»å¦çå¾ã®ãã¼ã¿ã¯ãNet::Amazon::S3ã§s3ã¸ä¿åãã¦ãã¾ãã
å¦çå¾ã¯ec2ãåæ¢ããä½è¨ãªæéãçºçããªãããã«ãã¦ãã¾ãã
ã¨ãããã®ã§ãã
#!/usr/local/bin/perl use strict; use FindBin; use Log::Log4perl; use Net::Amazon::EC2; use Net::Amazon::S3; use Net::SSH::Perl; use Parallel::ForkManager; use Data::Dumper; my $EC2_OBJ; my $EC2_OBJ_DEF = { AWSAccessKeyId => 'ãªããã', SecretAccessKey => 'ãªããã' }; my $EC2_INSTANCES = {}; my $RUN_INSTANCES_DEF = #ææ°ã®éçºPGãå«ãã AMIãç¨æãã¾ããã { cmd_param => { ImageId => 'ami-84db39ed', InstanceType => 'm1.small', MinCount => 500, MaxCount => 500 }, sleep => 30, retry => 10 }; #æ大ã®åæ£æ°ã®å®ç¾© my $GET_IDLE_INSTANCE_DEF = { retry => 30, sleep => 30 }; my $MAX_PROCESS = 500; my $ITEMCODE_LIST_DIR = $FindBin::Bin . '/itemcode_list'; my $ITEMCODE_LIST_REGEXP = '.txt'; my $SSH_DEF = { user => '', passwd => '' }; my $LOG_DEF = { 'log4perl.rootLogger' => 'INFO, LOGFILE', 'log4perl.appender.LOGFILE' => 'Log::Log4perl::Appender::File', 'log4perl.appender.LOGFILE.filename' => $FindBin::Bin . "/log/tenkai.log", 'log4perl.appender.LOGFILE.mode' => 'append', 'log4perl.appender.LOGFILE.layout' => 'PatternLayout', 'log4perl.appender.LOGFILE.layout.ConversionPattern' => '%d %F [%p] %m %n' }; Log::Log4perl::init($LOG_DEF); my $LOG = Log::Log4perl::get_logger("rootLogger"); main(); sub main { #init $EC2_OBJ = Net::Amazon::EC2->new(%$EC2_OBJ_DEF); $LOG->info('ec2_run_instances()'); ec2_run_instances(); #ec2ãè¤æ°èµ·å $LOG->info('get_itemcodes()'); my $itemcodes = get_itemcodes(); #å¦ç対象ã®ä¸è¦§åå¾ my $pm = Parallel::ForkManager->new($MAX_PROCESS); for my $itemcode_info (@$itemcodes) { my ( $itemcode, $tei_list, $no, $itemcode_size ) = split( "\t", $itemcode_info ); #å¦çãè¡ã£ã¦ããªãec2ãåå¾ my $ec2_ins = get_idle_ec2_instance(); my $pid = $pm->start and next; # do the fork $EC2_INSTANCES->{ $ec2_ins->{instance_id} }->{state} = 'working'; $LOG->info( "do_parallel() $tei_list $itemcode $no/$itemcode_size start"); do_parallel( $ec2_ins, $itemcode ); #ec2ãè¡ãå¦ç $LOG->info("do_parallel() $tei_list $itemcode $no/$itemcode_size done"); $EC2_INSTANCES->{ $ec2_ins->{instance_id} }->{state} = undef; $pm->finish; # do the exit in the child process } #finalize $LOG->info('ec2_terminate_instances()'); ec2_terminate_instances(); #ec2ã®åæ¢ $pm->wait_all_children; } sub get_itemcodes { my $dh; unless ( opendir( $dh, $ITEMCODE_LIST_DIR ) ) { $LOG->error("can't opendir $ITEMCODE_LIST_DIR"); die_script(); } my @itemcode_lists = grep /$ITEMCODE_LIST_REGEXP/, readdir($dh); unless ( closedir($dh) ) { $LOG->error("can't closedir $ITEMCODE_LIST_DIR"); die_script(); } my @ret; for my $itemcode_list (@itemcode_lists) { my $itemcode_path = "$ITEMCODE_LIST_DIR/$itemcode_list"; my $fh; unless ( open( $fh, $itemcode_path ) ) { $LOG->error("can't open file $itemcode_path"); die_script(); } my @itemcodes = <$fh>; my $itemcode_size = @itemcodes; unless ( close($fh) ) { $LOG->error("can't close file $itemcode_path"); die_script(); } my $i = 0; for my $itemcode (@itemcodes) { $itemcode =~ s/\s+$//go; push( @ret, join( "\t", $itemcode, $itemcode_list, ++$i, $itemcode_size ) ); } } return \@ret; } sub ec2_run_instances { #ã¨ããããèµ·åcommandãéã£ã¦ãå¾ ã¤ my $reservation_info = $EC2_OBJ->run_instances( %{ $RUN_INSTANCES_DEF->{cmd_param} } ); my %not_run_instances = map { $_->{instance_id} => $_ } @{ $reservation_info->{instances_set} }; #èµ·åæ¸instanceãåé my $retry = 0; while ( $retry <= $RUN_INSTANCES_DEF->{retry} and keys %not_run_instances > 0 ) { sleep( $RUN_INSTANCES_DEF->{sleep} ); for my $ins_id ( keys %not_run_instances ) { my $rsv_info = $EC2_OBJ->describe_instances( InstanceId => $ins_id ); my $instance = $rsv_info->[0]->{instances_set}->[0]; if ( $instance->{instance_state}->{name} eq "running" ) { $EC2_INSTANCES->{ $instance->{instance_id} }->{state} = undef; $EC2_INSTANCES->{ $instance->{instance_id} }->{instance} = $instance; delete( $not_run_instances{$ins_id} ); } } $retry++; } #èµ·åãå¾ ã¡ãããªãã£ãinstanceã¯terminate for my $ins_id ( keys %not_run_instances ) { $LOG->warn("couldn't wait for running EC2 instance, terminate:$ins_id"); $EC2_OBJ->terminate_instances( InstanceId => $ins_id ); } #1åãèµ·åã§ããªããã°die if ( keys %$EC2_INSTANCES < 1 ) { $LOG->error("couldn't run EC2 instances"); die_script(); } } sub ec2_terminate_instances { for my $instance_id ( keys %$EC2_INSTANCES ) { $EC2_OBJ->terminate_instances( InstanceId => $instance_id ); } } sub do_parallel { my ( $ec2_ins, $itemcode ) = @_; my $ssh = Net::SSH::Perl->new( $ec2_ins->{dns_name} ); $ssh->login( $SSH_DEF->{user}, $SSH_DEF->{passwd} ); # ex. $ssh->cmd("/home/endo/dev/tenkai $itemcode"); # 1)get itemdata(.t, .p) from amazon s3 (Net::Amazon::S3) # 2)do tenkai # 3)commit outdata to amazon s3 (Net::Amazon::S3) # 4)get old outdata(.o) from amazon s3 (Net::Amazon::S3) # 5)diff outdata # 6)commit diff to amazon s3 (Net::Amazon::S3) $ssh->cmd("exit"); } sub get_idle_ec2_instance { my $retry = 0; while ( $retry <= $GET_IDLE_INSTANCE_DEF->{retry} ) { for my $ins_id ( keys %$EC2_INSTANCES ) { unless ( $EC2_INSTANCES->{$ins_id}->{state} ) { return $EC2_INSTANCES->{$ins_id}->{instance}; } } $retry++; $LOG->info("get_idle_ec2_instance():retry", " $retry/$GET_IDLE_INSTANCE_DEF->{retry}"); sleep( $GET_IDLE_INSTANCE_DEF->{max_retry} ); } $LOG->error("fail get_idle_instance()"); die_script(); } #die ããã¨ãã¯ãec2ã®instanceãterminateããªãã¨æéãçºçãã¾ãã sub die_script { ec2_terminate_instances(); die; }
ä½ã¨ãªãåä½ããã®ã§ãæ¬æ°ã§å°å ¥ãèãã¦ã¿ãããã