ããã«ã¡ã¯ããµã¼ãã¼ã¨ã³ã¸ãã¢ã®ä½è¤å¤ªä¸(@teach_kaiju)ã§ãã
æ¬è¨äºã§ã¯ç¤¾å
ã§éçºãããæ°åã®ãã¼ã¿ãå¦çããä»çµã¿ãæä¾ãã gem MedPipe ãç´¹ä»ãã¾ãã
MedPipe ã¨ã¯
ãLog ã®ãã¼ã¿ãå
¨ã¦åå¾ãããã©ã¼ãããã㦠tsv ã¨ã㦠S3 ã«ã¢ãããã¼ããããã¨ããè¦ä»¶ããã£ãã¨ãã¾ãã
ãã®è¦ä»¶ãå®ç¾ããããã«ãä¾ãã°ä»¥ä¸ã®ãããªå®è£
ãèãããã¨ãã§ãã¾ãã
upload_file_name = "hoge_logs.csv" # 1. S3ã«ã¢ãããã¼ãããããã® file ãç¨æ Tempfile.create do |file| # 2. Log ã®ãã¼ã¿ã DB ããåå¾ HogeLog.find_each do |log| # 3. ãã©ã¼ãããå¦ç formatted_data = format(log) # 4. ãã¡ã¤ã«ã«æ¸ã込㿠line = CSV.generate_line(formatted_data, col_sep: "\t") file.puts(line) end # 5. S3ã«ã¢ãããã¼ã upload_s3(file, upload_file_name) end def format(log) # å¦ç end def upload_s3(file, upload_file_name) # å¦ç end
ããã«å¯¾ãã¦ãMedPipe ã使ãã¨ä»¥ä¸ã®ããã«è¨è¿°ã§ãã¾ãã
upload_file_name = "hoge_logs.csv" pipeline = MedPipe::Pipeline.new pipeline.apply(PipelineTask::HogeLogReader.new) # 1. Log ã®ãã¼ã¿ã DB ããåå¾ .apply(PipelineTask::HogeLogFormatter.new) # 2. ãã©ã¼ãããå¦ç .apply(MedPipe::PipelineTask::TsvGenerater.new) # 3. ãã¡ã¤ã«ã«æ¸ã込㿠.apply(PipelineTask::S3Uploader.new(upload_file_name)) # 4. S3ã«ã¢ãããã¼ã pipeline.run
ãã®ããã«ãMedPipe ã使ããã¨ã§å¦çã®æµããæ確ã«ãªããå¯èªæ§ãåä¸ããããã¨ãã§ãã¾ãã
ããã«å ãã¦ä»¥ä¸ã®ãããªæ©è½ã容æã«å®è£ ãããã¨ãã§ãã¾ãã
- 並åå¦ç
- ã¯ã¨ãªæé©åã®ããã®ãin_batches ãç¨ããªãç¬èªãã¼ã¿åå¾å¦ç
- 件æ°ã®ã«ã¦ã³ã
- ã¢ãããã¼ããããã¡ã¤ã«ãµã¤ãºã®ä¿å
Ruby ã¨ã³ã¸ãã¢ã«ã¨ã£ã¦ã¯ Dataflow çã®å¤§è¦æ¨¡ãã¼ã¿å¦çãã¼ã«ã¨æ¯ã¹ã¦å¦ç¿ã³ã¹ããä½ããããå°å ¥ãæ¯è¼ç容æã«è¡ããã¨ãã§ãã¾ãã
ã³ã³ã»ãã
MedPipe ã§ã¯ Pipeline ã« PipelineTask ãç»é²ãããããé çªã«å®è¡ãã¾ãã
PipelineTask ã¯ãããããã¨ãã®ãã®ã§ãããããç¬èªã§å®è£
ããå¿
è¦ãããã¾ãã
PipelineTask ãå®è£
ããå¿
è¦ã®ããã¡ã½ãã㯠call ã®ã¿ã§é常ã«ã·ã³ãã«ã§ãã
def call(context, prev_result) yield "次ã®Taskã®ç¬¬äºå¼æ°ã«æ¸¡ãå¤" end
ãã ãã大éã®ãã¼ã¿ãæ±ãéã«ã¯å
¨é¨ã®ãã¼ã¿ãã¡ã¢ãªã«ã®ãã¦æ¬¡ã® Task ã«æ¸¡ãããã«ã¯ããã¾ããã
ããã§ãåºæ¬çã«ã¯ Enumerable::Lazy ãå¾ç¶ Task ã«æ¸¡ãã¾ãã
(lazy 㧠Enumerable ã Enumerable::Lazy ã«å¤æã§ãã¾ã)
ä¾
def call(_context, _) yield HogeLog.find_each.lazy end
å¾ç¶ Task 㯠Enumerable::Lazy ãåãåããmap ã§å¦çãæããã¨ã§ Enumerable::Lazy ãç¶æã§ãã¾ãã
def call(_context, records) yield records.map { |record| format_line(record) } end
PipelineTask ã®ä»ã«ã PipelineTask ã Pipeline ã«ç»é²ããå¦çãªã©ä½¿ãæºåã¯å¿ è¦ã§ãã
Usageã¨ãµã³ãã«ãåèã«ãã¦ãã ããã
DB ããã®ãã¼ã¿åå¾æ¹æ³
å®å㧠find_each ã使ãå ´åã«ã¯ 2 ã¤ã®åé¡ãããã¾ããã
- ActiveRecord ã®ã¡ã¢ãªä½¿ç¨éãå¤ã
- ã¯ã¨ãªãæé©åãããªã
1 ã«é¢ãã¦ã¯ in_batches + pluck ã使ããã¨ã§è§£æ±ºã§ãã¾ããã2 ã«é¢ãã¦ã¯è§£æ±ºã§ãã¾ããã
åè: Railsã§in_batches使ãã¨ã¨ã¦ãé
ã
ããã解決ããããã«ãMedPipe ã§ã¯ BatchReader ã¨ããã¯ã©ã¹ãéçºãã¾ããã
使ç¨ä¾:
def call(_context, _) yield MedPipe::BatchReader.new( HogeLog, scope: HogeLog.where(created_at: @target_date.all_day), pluck_columns:, batch_size: BATCH_SIZE ).each.lazy end
ããã«ãã£ã¦ find_each ã®ããã«1件ãã¤ãpluck_columns 㧠pluck ããããã¼ã¿ãå¾ç¶ Task ã«æ¸¡ããã¨ãã§ãã¾ãã
ãããã¡ã¤ãªã³ã°ã®ä»æ¹
å®åã§ã¯ memory_profiler ãç¨ãã¦ã以ä¸ã®ãããªã³ã¼ãã§ãããã¡ã¤ãªã³ã°ãè¡ãã¾ããã â» å·çã«ãããä¸é¨ä¿®æ£ãã¦ãã¾ãã
module Profiler class << self ... def report(&block) start_time = Time.current result = MemoryProfiler.report(&block) elapsed_time = Time.current - start_time puts "\n\n===== Profiler Report =====" puts "Total allocated: #{bytes_to_mb(result.total_allocated_memsize)} MB (#{result.total_allocated} objects)" puts "Total retained: #{bytes_to_mb(result.total_retained_memsize)} MB (#{result.total_retained} objects)" puts "Elapsed time: #{elapsed_time.round(2)} sec" end ... private ... # bytes to MB å°æ°ç¹ç¬¬äºä½ã¾ã§ def bytes_to_mb(bytes) (bytes / 1024.0 / 1024.0).round(2) end end end
class PipelineTask::Profiler def call(_context, input) Profiler.report do # Lazy ã®å ´åã測å®ããããã«çºç«ãã input.force if input.is_a?(Enumerator::Lazy) yield(input) end end end
pipeline.apply(PipelineTask::Profiler.new)
æ¢åã®ã¹ã¯ãªãããä¿®æ£ãããã¨ãªãããããã¡ã¤ãªã³ã°ãè¡ããã¨ãã§ãã¾ãã
ãããã«
æ¬è¨äºã§ã¯ãMedPipe ã®ç´¹ä»ãè¡ãã¾ãããæ¬ gem ã¯å¼ç¤¾åã®ãªã¼ãã³ã½ã¼ã¹ã® gem ã§ãã
æ®æ®µæ§ã
㪠OSS ã®ãä¸è©±ã«ãªã£ã¦ãããããæä¾ããå´ã¨ãã¦æ¥çã«è²¢ç®ã§ãããã¨ãå¬ããæãã¾ãã
OSS ã¨ãã¦ä¸ã«åºããã¨ã許å¯ãã¦ããã ããä¼ç¤¾ãä¸ç·ã«éçºããååã®è¿è¤ãã(@tetetratra)ã«æè¬ã§ã!
å®è£
ãåèã«ãªã£ããã使ã£ã¦ã¿ã¦ããã£ãå ´åã¯ããã² MedPipe ã® GitHub ãªãã¸ããªã«ã¹ã¿ã¼ãããã ããã¨å±ã¿ã«ãªãã¾ãã
æ¯éèªè ã«ãªã£ã¦ãã ããï¼
ã¡ããã¢ã§ã¯ä¸ç·ã«åã仲éãåéãã¦ãã¾ãã ãå¿åããå¾ ã¡ãã¦ããã¾ãï¼
â åéãã¸ã·ã§ã³ã¯ãã¡ã medpeer.co.jp
â ã¨ã³ã¸ãã¢ç´¹ä»ãã¼ã¸ã¯ãã¡ã engineer.medpeer.co.jp
â ã¡ããã¢å ¬å¼YouTubeã www.youtube.com
â ã¡ããã¢å
¬å¼note
style.medpeer.co.jp