並行・並列、そしてAsync

この記事はANDPAD Advent Calendar 2024の17日目の記事になります。(投稿予定の12/17前後で風邪を引いてしまい投稿が遅れてしまいました。🙇‍♀️)

@youchanです。実は今年の8月にアンドパッドに入社していました。アンドパッドではインフラコストを削減するための施策を行なうチームに配属しています。 アンドパッドでは建設業のDXを実現するサービスを提供しています。建設の現場では多くの写真が取り扱われます。膨大な写真データはインフラコストに響くので削除できるものは削除したいところです。実際にサムネイル画像も保存されていたりして削減可能なものがたくさんあります。 私の最近のミッションはこの不要なファイルを削除するということなのですが、ファイル数が膨大なためいくつかの工夫が必要なのでそれについて記事にしたいと思います。

また本稿では、課題を通して並行・並列処理についての理解を深められるような内容を心掛けて記述しようと思います。

課題定義

バッチ処理でよくあるパターンとして、データベースやCSVファイルなどで対象のデータが与えられ、それらを処理した後にデータベースに書きこむ、ファイルに書きこむ、APIで他のサービスに結果を渡すなどなんらかの結果の出力処理を行なうということがあります。 このように、入力データ(IO) --> なんらかの処理 --> 出力データ(IO) という流れはバッチ処理の黄金パターンと言えます。 入力データが大きくなれば効率のよい処理を書かなければ、現実的な時間で実行できないでしょう。このパターンでは効率よく処理するために以下のようなことが必要になります。

  • 大きなデータを扱う上でメモリの消費を抑えるためのストリーム処理
  • スループットを上げるための並行処理

ここで並行処理と書きましたが並列処理でないことについては後述します。

ストリーム処理

ストリーム処理というと近頃ではApache Kafkaのようなものを想像する人も多いかと思いますが、もうすこし広義にとらえてください。 例えばJavaのIOストリームのようなものを想定しています。ファイルを読み込みながら小さな単位で処理を進めていくような処理のしかたです。

ここでは、CSVファイルからファイルパスを読み込むような処理を考えます。CSVファイルは以下のようなフォーマットになっているとします。

path ---| path/to/file ...

CSVファイルから一度にデータを読みこむにはRubyの場合、以下のようになります。

csv = CSV.read('input.csv')

このように一度に読み込めばファイルサイズが大きい場合、メモリの消費も大きくなります。Rubyの場合はオブジェクトのオーバーヘッドがあるためファイルサイズより大きなメモリを要求します。 このような大きなメモリ消費を抑えるためにはファイルを読みこみながら処理をするということが必要になります。例えば以下のような感じです。

CSV.foreach('input.csv', headers: true) do |row|
  path = row['path']
  # pathに対する処理
end

あるいはデータをいくつかの固まりとして処理するならば以下のようにミニバッチとして処理することも出来ます。1

CSV.foreach('input.csv').each_slice(1000) do |rows|
  paths = rows.map{|row| row['path'] }
  # pathsに対するミニバッチ
end

ここでmap処理がループのブロックの中に入っていることが浮いてみえるのでストリーム処理らしくメソッドチェインで書くとすっきりします。

CSV.foreach('input.csv').lazy.map{|row| row['path'] }.each_slice(1000) do |paths|
  # pathsに対するミニバッチ
end

ここでmapメソッドは通常は配列を返します。配列を返すということはせっかくのストリーム処理に全データを持つバッファを作ってしまいます。mapメソッドの前にlazyメソッドを呼ぶことで遅延処理版のmapメソッドになります。 ここまでがストリーム処理による読み込み処理になります。

並行処理

読み込みがストリーム処理になりました。出力側の処理はミニバッチとして処理します。この処理を並行に処理することでスループットを上げていきます。 その前になぜ並列処理ではなく並行処理としているのかについて述べます。

並行処理、並列処理の違いは何でしょう? 以前、私は次のような定義をしました。

https://qiita.com/youchan@github/items/2e5484517875d8cfd9aa

並行性

プログラム中の複数の処理ブロックが論理的に並行動作可能な状態。それぞれの処理ブロックは処理順序に関係なく実行することができ、共有するリソースの競合を回避することができる。

並列性

並行性をもつプログラムが時間的に同時に動作することが出来る複数のプロセッサによって並列に実行される状態

この定義には異議があるかもしれませんが、そもそも厳密な定義があるのかわからない用語なので解釈しだいだと思います。ポイントは並列処理というはマルチCPUやマルチコアによって複数の(CPU上の)処理が同時に実行されることを指します。

CPU上のという断りを入れたのはIOの処理は多重化されることがあっても並列処理とは言われることは少ないと思うからです。

このCPU上の処理とIOの処理という区別はプログラムのパフォーマンス(スループット)を考える上で重要です。処理全体の中でCPUで処理されるものとIOの処理とでどのくらいの割合になるかで考え方が変わります。CPUの処理が支配的な場合はCPUバウンドと呼び、IOの処理が支配的であればIOバウンドと呼びます。

CPUバウンドの処理はまさにCPUが働く時間が長いため、並列性を上げないと処理の多重度を上げることはできません。典型的な例としては機械学習のようなものでしょう。機械学習ではCPUどころかGPUという専用のハードウェアで多重度を極限まで増やしています。

本稿の主題である典型的なバッチ処理はIOバウンドになります。ファイルから入力データを読み込む、計算結果をデータベースに書き込む、あるいはAPIで他サービスと通信する、などIO処理に多くの時間を費やします。

IOが処理をしている間というのはCPUは働いていません。IOの間にCPUが別の処理を行なうことが出来るということです。これはCPU数(コア数)を増やさなくてもCPUに並行処理をさせることが出来るということを示しています。

並行・並列の実装

では実装上ではどうなるしょうか。Rubyでは(というより現代のOSに合わせて設計されたほとんどの言語では)以下の3つの選択肢があります。

  • マルチプロセス(fork)
  • マルチスレッド(Thread, Ractor)
  • ファイバーあるいはコルーチン(Fiber, Async)

括弧内はRubyで使うAPIの名前です。

マルチプロセス

マルチプロセスは古くからOSに搭載された機能です。OSはプロセス単位でタスクを管理しており、タイムシェアリングシステムによりプロセスを切り替えて動かします。 マルチプロセスは安定しており、プロセス間の境界の分離もしっかりしているので並列処理をするにはとても良いソリューションですが以下のデメリットがあります。 - プロセス間でのデータの共有のためのしくみを別途考える必要があり、データ共有のためのオーバーヘッドを伴う。 - コンテキストスイッチのオーバーヘッドが大きい。

マルチスレッド

マルチスレッドはマルチプロセスにおけるデメリットを補うために登場した技術です。スレッドは同一プロセス内に存在するのでデータ共有はプロセスのメモリ空間内で行なえます。また、プロセスに比べればコンテキストスイッチに掛かるオーバーヘッドも抑えることが出来ます。 ただし、複数のスレッドで同一のメモリにアクセスを行なうためデメリットもあります。 - メモリアクセスに競合が発生する可能性がある。競合を回避するために同期化が必要であるが、プログラム上複雑性が増し、正しく競合を回避するのに高度な知識を要求する。 - それでもコンテキストスイッチにオーバーヘッドがある。

ここでいうマルチスレッドはOSの仕組みの上でのマルチスレッドです。多くのプログラミング言語ではプログラミングモデルによってマルチスレッドプログラミングの難しさを解消したり、M:Nスレッドという軽量化の手法でコンテキストスイッチのオーバーヘッドをより少なくする試みがあります。 RubyでもRactorによってマルチスレッドの問題を解消しようとしています。

Ractorについては作者の笹田さん自身の記事が詳しいと思いますので以下を参考にしてください。 - Ruby 3.0 の Ractor を自慢したい - M:Nスレッドによる軽量な並行処理への挑戦

Ractorによってマルチスレッドの問題の大部分が解消すればマルチスレッドでよさそうに見えます。並列性によってスループットを上げたい場合はそう言えるでしょう。 並列性とはCPU上の処理を多重化するものであるということを思いだしてください。IOバウンドな処理では以下で説明するファイバーでも大きな成果を発揮します。

ファイバー

ファイバーは協調的マルチタスクによって実現されます。協調的マルチタスクについて説明すると長くなりますので割愛しますが、IOバウンドの話を思い出してもらえると良いかと思います。 つまり、あるタスク(ファイバー)のCPU上の処理がIOの処理に移ったときに別のタスクに実行が移されされることで並行処理を行なうという理解でよいでしょう。

ここで上げた3つの方式のうちファイバーだけが並行性をもつが並列性を持たないので、以下のようなデメリットがあります。 - 並列性を持たないのでCPUの処理を多重化できない。(つまり、適用範囲が限定されます)

ファイバーは並列性を持たない代わりに並列性を必要としない場面では強みを発揮します。

ファイバーはIOバウンドな処理のスループットを上げるには強力な方法です。同様に、nodejsのasync/awaitもIOバウンドな処理に対して効果的です。

Rubyではファイバーを利用するためのAPIとしてFiberがありますが、新しいAPIとしてAsyncがSamuel Williamsによって提唱されています。

Asyncによるバッチ処理の実装

ここで最初の課題に戻ってバッチ処理のスループットを上げる方法について考えます。 バッチ処理はIOバウンドな処理になるので、ファイバーを使うことが良い選択であることはここまでの説明から分かるでしょう。 ここではAsyncによる実装について述べます。

まず、入力処理はブロックのままでは並行実行できませんので入力処理をenumeratorとして得ます。

enum = CSV.foreach('input.csv').lazy.map{|row| row['path'] }.each_slice(1000)

enumeratornextメソッドを持っていて複数のタスクからも呼び出すことが出来ます。2

NUM_OF_TASKS定数にタスクの数を設定して以下のコードのようにして複数のタスクを実行します。

enum = CSV.foreach('input.csv').lazy.map{|row| row['path'] }.each_slice(1000)

Async do
  NUM_OF_TASKS.times do
    Async do
      loop do
        paths = enum.next
        # pathsに対するミニバッチ
      end
    end
  end
end

入力データの最後まで読むとenum.nextStopIteration例外を投げるのでループが終了します。

まとめ

本稿ではバッチ処理を題材として、並行・並列処理の考えかたについてまとめました。 三行でまとめると以下のようなことが述べられています。

  • 並行・並列処理について、CPUバウンドかIOバウンドかという観点でまとめました
  • バッチ処理の典型的なパターンではIOバウンドになることが多く、並行処理にはファイバー(Async)が有利であることを示しました
  • また、バッチ処理におけるストリーム処理についても触れました

  1. スループットを重視するならこのように小さなバッチで処理することは多いかと思います。例えばバルクインサートのようなものです。この記事を書く切っ掛けとなったS3からオブジェクトを削除するというタスクも1000件単位で削除を行なえるAPIが使えました。
  2. スレッドの場合はこうはいきません。もし興味があればスレッド版を書いて確かめてみてください。