Rubyã§ä¸¦åå¦çããã£ã¦ãã #AdventCalendar
mixiã°ã«ã¼ãã¢ããã³ãã«ã¬ã³ãã¼2016 1æ¥ç®ã§ãã
ä»åã¯ãèªåãä»ã¾ã§å©ç¨ããRubyã§ã®ä¸¦åå¦çãæ¸ãããã®gemã¨ãç¥è¦ãç´¹ä»ãã¾ãã
æ©é
å æ¥ã®RubyKaigi 2016ã§ãRuby3ã§ã¯Guildã¨ããæ°ãã並åå¦çã®ã¢ãã«*1ããå°å ¥ãããã¨ããã»ãã·ã§ã³ããã£ãããconcurrent-rubyã¨ããgemã®éçºãæµè¡ãåãã¦å± ããã¨ãRubyçéã§ãä½ã¨ãªã並åå¦çããã¼ã ãã¦ããããã«æãã¾ãã
ãã«ãããã»ã¹/ã¹ã¬ãã
ãããRubyã§ä¸¦åå¦çããã®ã¯è¨èªã®ä»æ§ã¨ãã¦ãããªãã«å¶éããããä»ã®è¨èªã®ããã«Threadããã³ãã³ç«ã¦ã¦ãã«ãã³ã¢ã§è¨ç®ï¼çéåï¼ï¼ã¿ãããªã®ã¯é£ããã§ãã ã¨ããã®ããRuby1.9ãããã¤ãã£ãã¹ã¬ããã¯å°å ¥ããããã®ã®å¤ãã®Cæ¡å¼µã使ã£ãgemã®ã¹ã¬ããã»ã¼ãæ§ãåé¡ã¨ãªããããGIL(Global interpreter lock)ã¨å¼ã°ããä»çµã¿ãåå¨ãã¦ãããRubyãCæ¡å¼µã®å¦çèªä½ã1ã¤ã®ããã»ã¹ä¸ã§åæã«å®è¡ããããã¨ãããã¾ãããï¼éã«ããã¨GILã®ãããã§ã¹ã¬ããã»ã¼ãæ§ã«ã¤ãã¦ä½ãèããã«Rubyãæ¸ãã¦ããï¼ï¼
ããã§ãã並åã«å¦çãå®è¡ãããã¨ãã¯æ®éã«Rubyã§ã¢ããªã±ã¼ã·ã§ã³ãéçºãã¦ããã¨è¨ªãã¦ãä½ã¨ããã£ã¦ããå¿ è¦ãããã¾ãã ãã®å ´åã以ä¸ã®ãããªå®ç¾æ¹æ³ãèãããã¾ãã
- forkãã¦ãã«ãããã»ã¹åï¼Perlã¨ãPHPã§ã¯ãã£ã¡ãå¤ãï¼
- ã¡ã¢ãªæ¶è²»å¤§ ï¼Copy on Writeã¨ããä»çµã¿ãããã®ã§ä¸å®ã¯å ±æãããï¼
- ã¹ã¬ããã»ã¼ãæ§èããªãã¦è¯ãã¦ã·ã³ãã«
- Rubyã§ã®è¨ç®å¦çã§ãCPUã®ã³ã¢æ°ä½¿ãããã
- ãã«ãã¹ã¬ããå + IOå¤éå
- ã¡ã¢ãªæ¶è²»å°
- ãã°ãªãåããããã¹ã¬ããã»ã¼ãæ§ãå¿ è¦
- Rubyã§ã®è¨ç®å¦çã§ã¯CPU使ããããªãã®ã§IOå¦çã¨çµã¿åããã
並åå¦çãæ¸ãã®ã¯é£ãã
ä¸è¿°ã®ãããªããªããã£ããªå®ç¾æ¹æ³ããã£ãã¨ãã¦ããèªåã§ä¸ããå ¨ã¦æ£ããæ¸ãã®ã¯ã¨ã¦ããã¼ãã«ãé«ããã¨ã§ãã ä»ã®è¨èªã§ãThreadãç´æ¥ä½¿ããããæ½è±¡åãããã¢ãã«ï¼Future, Actor, async/awaitãªã©ï¼ãå©ç¨ãããã¨ãå¤ãã§ãã Rubyã®å ´åã¯ç¨éã«ç¹åãã便å©ãªå®è£ ããã§ã«ããã®ã§ãã®è¾ºã使ãæããå§ããã¨è¯ãã¨å人çã«æãã®ã§ç´¹ä»ã¨ãããã¾ã§å¾ãç¥è¦ãå ±æãã¾ãã
- Parallel
- Sidekiq
æåã«è§¦ããconcurrent-rubyã使ãã¨ä»ã®è¨èªã§å©ç¨ã§å©ç¨ããã¦ãããããªéåæå¦çã®æ½è±¡åã¢ãã«ãå©ç¨ã§ãã¾ããä»åã¯çç¥ãã¾ãã
Parallel
ã«ã¼ãå¦çããã¡ããã¡ãç°¡åã«ä¸¦ååã§ããã©ã¤ãã©ãªã ããã»ã¹ã¢ãã«ã»ã¹ã¬ããã¢ãã«ä¸¡æ¹æ¡ç¨ã§ãã¾ãã
ãµã³ãã«ã³ã¼ããããã©ã§ããã·ã³ã®CPUæ°ã調ã¹ã¦ãã®æ°ã ããã«ãããã»ã¹ãèµ·åãã¦å¦çãã¦ããã¾ãã
è¨æ³ãç°¡åã§ãParallel.map
ã®å¼æ°ã«é
åã渡ãã ããådoãend
å
ã®å¦çãè¤æ°ã®ããã»ã¹ãã¹ã¬ããã§å¦çãè¡ãããããã«ãªãã¾ãã
map
ãå©ç¨ããã°Rubyã®Array#map
ã®ããã«å¦çããçµæãé
åã§åãåããã¨ãå¯è½ã§ãï¼ãã¡ããå¼æ°ã§æ¸¡ããé
åã®é çªãä¿æãã¦ããã¾ãï¼ã
# 2 CPUs -> work in 2 processes (a,b + c) results = Parallel.map(['a','b','c']) do |one_letter| expensive_calculation(one_letter) end
ããããã¨ã使ã£ã¦ã
- ãµã¼ãã¼1å°ã§å®è¡ããç¨åº¦ã®ã¹ã¯ãªããï¼éè¨ã¨ãï¼ã®å¦çãæ©ããããæ
- ä»æ§ã¯ã©ã¹ã¿ãããªãã®ãä½ã£ã¦ãã¼ãã«å ¨ä»¶èª¿ã¹ã¦å¯¾è±¡ãã¼ã¿ã®ã¿æ½åºã¨ã
- 1å°ã§å®è¡ããã°å¦çã®ã¢ã¦ããããå ã1ç®æã«ã¾ã¨ããã®ãç°¡å
- 並åã«ç»åãªã©ã®ãã¼ã¿ããã¦ã³ãã¼ãã»ã¢ãããã¼ããã
class FooTargetUserSpecification def satisfied_by?(user) # ... # éãã®å¤å®ãã¸ã㯠end def satisfied_users(&block) User.includes(:some_associations).find_in_batches do |gruop| Parallel.each(group, in_processes: 4) do |user| @reconnected ||= User.connection.reconnect! || true block(user) if satisfied_by?(user) end User.connection.reconnect! end end end file = File.open('target_users.csv', 'rw+') spec = FooTargetUserSpecification.new spec.satisfied_users do |user| file.puts(user.id) end file.close
ç¥è¦
- ãã«ãããã»ã¹ã§å®è¡ããã¨ã¡ã¢ãªé£ãã®ã§ãããããããã¹ã¯ãªãããCronã§åãå ´åã¯ã¡ããã¨å®å®ãã¦å®è¡ã§ãããã©ããæ¬çªãã¼ã¿ã¨åãè¦æ¨¡ã§æ¤è¨¼å¿ è¦
- å®è¡å 容ã«ãã£ã¦ã¹ã¬ãããã¼ã¹ã§ããã¨è¯ãã®ãããã»ã¹ãã¼ã¹ã§ããã¨è¯ãã®ãã¯èããï¼ã¡ã¢ãªå¹çã»ã¹ã¬ããã»ã¼ãæ§ãªã©ï¼
- ActiveRecordã¨çµã¿åããã¦ä½¿ãå ´åã«ã¯ãconnection_poolå¨ãã§åé¡ãèµ·ããã®ã§gemã®README.mdã«æ¸ãã¦ããåæ¥ç¶å¦ç使ãã¨è¯ã
Sidekiq
ããããéåæJobãã¥ã¼ã®å®è£ ã¨ãã¦æåãSidekiqã®ããã»ã¹ãèµ·åãã¦ããã¦ãRedisã«Jobãç©ãã¨ç©ãã ãã°ãã Sidekiqã®ããã»ã¹ãéæã¯ã¼ã«ã¼ãèµ·åãã¦Jobãæ¶åãã¦ãã£ã¦ããã¾ãã
ã¹ã¬ããã¢ãã«ã§ä¸¦ååããã¦ããã®ã§ãIOå¦çãªã©ã§ããããã³ã°ãããå ´åã«å¹æãåºãã®ã§ã ã¢ããªã®Pushéç¥ãéã£ãããã¡ã¼ã«ãéã£ãããDBã®ã¬ã³ã¼ããæ´æ°ãããã«åãã¦ã¾ãã åæ§ã®gemã«Resqueã¨ãããã®ããããããã¡ãã¯ããã»ã¹ã¢ãã«ã§å®ç¾ãã¦ããã®ã§ã¡ã¢ãªãä½åã«é£ã£ããããã Sidekiqã§ãããã»ã¹ã¢ãã«ã§å®è¡ããæ¹ãè¯ãéãè¨ç®å¦çãè¡ãããå ´åã¯ä¸¦åæ°1ã§è¤æ°ã®ããã»ã¹ãç«ã¦ãã°è¯ãã§ãã
ä½è ã®åªåã«ãã£ã¦ãã¼ã¸ã§ã³ãä¸ãããã¨ã«ã¹ã«ã¼ãããããã¡ãä¸ãã£ãããgemèªèº«ã®ä¾åé¢ä¿ãæ¸ã£ãããã¨ã³ã¿ã¼ãã©ã¤ãºåãã®æ©è½ãç¨æããã¦ãã¦å¾³ãé«ãã§ãã
ããããã¨ã使ã£ã¦ã
- HTTPãªã¯ã¨ã¹ãå
ã§ã¯å¦çãããããªãé
ãå¦çãéåæã«å®è¡ããJobãã¥ã¼
- Push/ã¡ã¼ã«éç¥
- é 延ããã¦Updateã¯ã¨ãªãçºè¡ããããæ
- åç»ã®ã¨ã³ã³ã¼ãå¦ç
- ç´°ãã大éã®å¦çãä¸æ°ã«ã¹ã±ã¼ã«ã¢ã¦ãããã¦å¦çãããæ
- æ°ããªãµã ãã¤ã«ãäºåä½æãããã£ãã·ã¥ã温ãããæ
- S3ããDBã«ç»é²ãããç»åããã¦ã³ãã¼ã -> 解æå¦ç -> DBã«æ°¸ç¶å
class UserFooWorker include Sidekiq::Worker sidekiq_options(queue: :default, retry: 3) def perform(user_id) user = User.find_by(id: user_id) return unless user # userãåå¾ã§ããªãã£ããçµäºãã¨ã user.do_something end end # å¼ã³åºãå´ # perform_asyncãå¼ã¶ã¨Redisã«Jobãç©ã¾ãã UserFooWorker.perform_async(user.id)
ç¥è¦
- Workerã®ã³ã¼ããæ¸ãã¨ãã¯ã¨ã«ããåªçå¶ï¼åªçå¶ï¼åªçå¶ï¼ã¨3åãããå±ãã¦ããã³ã¼ããæ¸ãã¦ããã¦èªã¿ç´ã
- 2å以ä¸åãå¦çãå®è¡ãã¦ãè¯ãå¦çãæ¸ã
- 1度å®è¡ããå¦çãã©ããããã§ãã¯ã§ããããã«ãã
- ä»æ§ã¨ãã¦2åå®è¡ããã¦ãã¾ãã®ãä¸é¨è¨±å®¹ãã
- Workerã®è¦ä»¶ã«ãã£ã¦é©åã«retryåæ°ã»æ¡ä»¶ã»ééãè¨å®ãã
- 1度失æãã¦å度å®è¡ãããæåããã®ãã©ããï¼
- éä¿¡ç³»ã¯ã¨ã©ã¼ã«ãªããããã®ã§ä½åããªãã©ã¤ããã
- åå¨ããªããã¼ã¿ã¸ã®å¦çã¯2度ã¨æåããªãå ´åãå¤ã
- exponential back-offã«ãã£ã¦1é±éå¾ã¨ãã«åå®è¡ããã¦å¬ããã®ï¼ï¼ï¼
- Workerå ã®ã³ã¼ãã¯ã§ããã ãã·ã³ãã«ã«ãã¦ã©ãã§ã¨ã©ã¼ãçºçãã¦ãã¤ãªãã©ã¤ãããã®ããåããããããªãããå¿ããã
- 1度失æãã¦å度å®è¡ãããæåããã®ãã©ããï¼
- ãªãªã¼ã¹ã失æããå ´åã®ãªã«ããªã¼æ段ãèãã¦ãã
- ä½ããçãã¦ã¬ãã½ãã°ã¨ãã§å¤§éã«ãªãã©ã¤Jobãåºãã¦ãã¾ã£ã¦retryãããæ¶åãã¦ãã¾ã£ãã¨ã
- ãã°ãä¿®æ£ããã®ã¡ã«å½±é¿ç¯å²åãå度ç©ã¿ç´ãã¨ã
- ä½ããçãã¦ã¬ãã½ãã°ã¨ãã§å¤§éã«ãªãã©ã¤Jobãåºãã¦ãã¾ã£ã¦retryãããæ¶åãã¦ãã¾ã£ãã¨ã
- 1 Workerã®ç²åº¦ãå°ãããã
- 1åã®è¦ªã®ãªã½ã¼ã¹ã«è¤æ°åã®åã®ãªã½ã¼ã¹ãçµã³ã¤ãå ´åã«ãåã®ãªã½ã¼ã¹ã®æ°ã ãJobãç©ãã¨Sidekiqã®ä¸¦åæ°ããããã°ãã®åå®äºã¾ã§éããªã
- Complex Job Workflows with Batches · mperham/sidekiq Wiki
- Queueãæèãã
- ãªã¯ã¨ã¹ãæã«éåæã§è¡ãJobãç©ãQueueã¨ããããå¦ççã«ä¸æ°ã«Jobãç©ãéã®Queueã¯åããï¼éåæå´ãè©°ã¾ãï¼
- CloudWatchãªã©ã«Queueã®ãµã¤ãºãã¡ããªã¯ã¹ã¨ãã¦Putãã¦ããï¼AutoScalingã«å©ç¨ã§ããï¼
- Redisãæèãã
- backtraceãªãã·ã§ã³ã¯ä¾¿å©ã ããæå¹ã«ããJobã大éã«ç©ãã§å ¨é¨ãããã¨ã»ã¨ãã©åæ§ã®backtraceãã¹ãã¬ã¼ã¸ã«æ¸ãè¾¼ã¾ãã¦ãã®ããã容éãé£ãã®ã§æ³¨æ
- XXX.perform_asyncã1ä¸åã«ã¼ããããããSidekiq::Client.bulk_pushã®APIãå©ç¨ãã¦ä¸æ¬ã§è©°ããã¨ã§è² è·ããããã«éãè©°ããâã®ããã«
# Jobãç©ãæã¯Redisã«åªããããããã«bluk_push使ãã¨ãä¸æ°ã«å¤§éã®Jobãç©ãã¦è¯ã User.select(:id).find_in_batches do |group| args = group.map {|user| [user.id] } Sidekiq::Client.bluk_push('class' => UserFooWorker, 'args' => args, 'backtrace' => false, 'queue' => 'user_foo_worker_queue') end
ã¾ã¨ã
Rubyã§ãç°¡åã«åãå ¥ãããã並åå¦çã®æ¸ãæ¹ã«ã¤ãã¦ç´¹ä»ãã¾ããã æ¢åå¦çã並ååãã¦é«éååºæ¥ãã¨æ°æã¡è¯ãã®ã§ããã²è©¦ãã¦è¦ã¦ãããããã§ãã æ¥å¹´ã¯concurrent-rubyãRxRubyã試ãã¦ã¿ããã