紙箱

覚えたことをため込んでいく

Clojureのいろんな並行処理の使い分け

この記事はもともとTumblrに書いていた自分のブログ記事を転載したものです。投稿日時も当時の投稿日時を再現してあります。

Clojureには標準でもagent系のsend, send-offに加え、future関数というスレッド起動系関数があります。
core.asyncの登場で、ここにgoマクロとthreadマクロが加わりました。

これらはすべて、背後ではJavaのExecutorsを使ってスレッドプールを作り、一度生成したスレッドの再利用を行いますが、それぞれ使っているスレッドプールが異なります。さらに関数自体の機能も異なるため、どれをつかったらいいのか迷ってしまうことがあります。

自分用に整理したので、メモとしておいておきます。

IOバウンドとCPUバウンド

まず、Clojureのスレッド関連関数の用途は、大きく2種類にわけられます。それが、IOバウンドとCPUバウンドです。

IOバウンドな処理は、実行中の処理がCPUよりもIO処理に強く依存します。DBアクセスとかリモート通信とかですね。別スレッドでこの処理を実行した場合、スレッドは大部分を、IO処理待ち状態で過ごします。
CPUバウンドな処理は、途中にIO待ちのような「待機」が発生せず、CPUをぶん回し続けるような処理です。全データがメモリに載っていて、CPUがフル稼働でそれらを処理するようなケースです。

IOバウンドな処理は大半をIO待ちで過ごすため、CPUを占有しません。一方CPUバウンドな処理は、その名の通り、動いている間中、CPUを使い続けます。

CPUを使い続けるような処理は、CPU(コア)数以上のスレッドを起動してもあまり意味がありません。たくさんのCPU依存処理を起動する場合、全スレッドがタスク処理でCPUを占有しているのがもっとも効率の良い状態で、それ以上起動しても、単にスレッド切り替えコストが無駄になるだけだからです。 CPUバウンドな処理は、スレッド数をコア数に近い数にとどめ、ひとつひとつのタスクは小さくして、たくさんのタスクをどんどんコアで分散して処理していくのが効率がよいことになります。CPUバウンドな処理はCPUを使うしかないのだから効率良く使いたいわけです。

一方、IOバウンドな処理は、その大半は「IO待ち」だったりします。リモートAPIを呼ぶ処理は、大半を「レスポンスが返ってくるのを待つ」ことに費やしています。

ここでたくさんのIOバウンドな処理を、コア数分の固定数スレッドで実行したことを想像してください。スレッドが4つだとして、4つのIO処理を起動すると…すべてのスレッドが使われ、それ以降の処理は待つしかありません。
CPUバウンドな処理であれば、スレッドはCPUをフルに使って一所懸命にタスクを実行していることでしょう。だから待つしかありません。しかしIOバウンドな処理では、4つのスレッドは、おそらく、ただIO待ちをしているだけです。

だから、IOバウンドな処理でスレッド数をコア数近くに限定するのは、あまり意味がないということになります。スレッドがIO待ちをしている間に、ほかの処理が動けるかもしれないのですから。だから、コア数以上のスレッドを起動して、どんどんIO待ちさせ、IOが終わったスレッドから処理を行えばよいのです。

固定数スレッドプールとキャッシュ化スレッドプール

Clojureの関数は、その用途がCPUバウンドかIOバウンドかによって、使用するスレッドプールが異なっています。

agent実行関数sendが使うスレッドプールは固定数であり、JavaのExecutors.newFixedThreadPoolメソッドで作られます。
一方、send-offが使うスレッドプールはキャッシュ化された非制限プールで、Executors.newCachedThreadPoolで作られます。非制限といっても、キャッシュ化スレッドプールは、使われなくなったスレッドを60秒で破棄するので、たくさんのスレッドがゴミとして残ることはありません。

多くのClojure関係の本で、sendはCPUに依存する処理に、send-offはIOに依存する処理に使う、と書かれているのは、このように、背後で使っているスレッドプールがことなるからです。

背後で使われているスレッドプールの種類がわかれば、その関数が、CPUバウンドな処理を想定しているのか、IOバウンドな処理を想定しているのかがわかります。以下は、Clojureのマルチスレッド関数がどのスレッドプールを使っているのかの一覧です。

 poolの定義場所プールの種類スレッドプール生成方法スレッド数
sendclojure.lang.Agent/pooledExecutor固定数Executors.newFixedThreadPool2+コア数
send-offclojure.lang.Agent/soloExecutorキャッシュ化Executors.newCachedThreadPool制限なし
future / future-call / pmap / pcallsclojure.lang.Agent/soloExecutorキャッシュExecutors.newCachedThreadPool制限なし
goclojure.core.async.impl.exec.threadpool/the-executor固定数Executors.newFixedThreadPoolコア数 * 2 + 42
thread / thread-callclojure.core.async/thread-macro-executorキャッシュExecutors/newCachedThreadPool制限なし
reducersclojure.core.reducers/poolForkJoinPoolnew java.util.concurrent.ForkJoinPool自動制御

futureのところにはpmapとpcallsも書いていますが、pcallsはpmapを、pmapはfutureを呼び出すので、すべてfutureと同じ扱いです。

まとめてみると、core.asyncの解説で必ず取り上げられるgoマクロは、固定数のスレッドプールを使っていることがわかります。つまり、goマクロはCPUバウンドな処理を前提としているわけです。

goマクロが「コア数 * 2 + 42」というよくわからないスレッド数を使っていることについて、特に42という謎の数値を指定していることについてははっきりしないのですが、+42は後から付け加えられたらしく、メーリングリストのポストなどを追跡すると、前述した、IOバウンドな処理に固定数スレッドプールを使った場合のような、IO待ちで全スレッドが停止して並行処理がスタックしてしまうことをある程度抑止したい、というのが意図のようです。goマクロはあくまでCPUバウンドな処理を扱うものであることは変わらないそうです。

42という数値については「すべての答え」から取ったのでは、という説もありますが、いまだ謎です。「すべての答え」ネタを知らない人はググってください。

goがCPUバウンドであるかわりにthreadマクロが用意されています。
threadマクロはgoマクロとほぼ同じ使い勝手で使えますが、キャッシュ化スレッドプールを使うため、IOバウンドな処理に向いています。goマクロと異なるところは一点だけで、チャネルの操作に <! と >! は使えず、ブロック型のチャネル操作関数 <!! と >!! を使う、ということです。<!!, >!! では呼び出した段階でスレッドがブロックしますが、そもそもthreadを使った場合はネイティブスレッドに処理が割り当てられていて、そのスレッドがブロックするだけなので、メインスレッドは止まらず、問題ありません。

goマクロで起動した並行処理は、単純にひとつのスレッドに丸ごと渡されるわけではなく、コンパイル段階で全処理が式単位に分解され、ステートマシンに変換されます。S式ならではです。そして<!, >!でチャネルへのアクセスごとにスレッドが切り替わる、といった動きをするようです。<!!, >!! をgoブロックで使うと、このスレッド切り替えがうまく動かなくなるので、&gt! か !< を使います。
彼らはこれをIoC Threadと読んでいますが、いやいやそれはIoCというよりも、昔の協調型マルチタスクと似たものだから「協調型スレッド」と呼ぶべきだという意見もあります。私も強調型だって意見に賛成ですが、たぶんIoCのほうがかっこいいってことなんだと思います)

reducersだけは特殊で、reducersは内部では並行処理をJava 7以降のFork/Join APIに処理を丸投げしています(JVMがJava7未満の場合は互換ライブラリを使っているようです)。Fork/JoinはJavaではとても使いにくいAPIで、Java 8でラムダ式とパラレルストリームが導入されてやっと本気出せるようになったのですが、ClojureではJava 8よりももっと前に、早々に対応していたわけです。よって性質としてはFork/Joinと同等でして、Fork/Joinのドキュメントによると、CPUバウンドな処理を前提にスレッド数を自動制御し、IOバウンドな処理が混ざるとうまく自動制御できないようです。

スレッドプールも、Java 7でFork/Joinとともに導入された、ForkJoinPoolを使っています。このプールは、初期値はCPUコアと同数のスレッドを用意し、ダイナミックにワーカースレッドを追加したり停止したりします。
つまり、reducersはFork/Joinにすべておまかせ、ということです。

そもそもFork/Joinは、要素数がとても多いデータ(10万とか100万とか)を高速並列処理するためのAPIなので、並列化が目的なら、数個程度の並列化ではreducersではなく別の機構をつかったほうがいいです。reducersの機能は並列化だけではないので、そっち目当てならよいですが。

プールの違い

表をよく見るとわかりますが、同じスレッド化プールを使っている関数でも、threadマクロだけは、プールが異なります。send-offとfutureは、ともにClojure標準関数なだけはあって、両方が同じスレッドプールを使っています。これはつまり、send-offで生成されたスレッドは、futureでも再利用できることを意味します。

core.async/thread は、そもそもcore.async自体がClojureの「外部ライブラリ」な位置づけですから、独自に定義したスレッドプールを使っています。よって、futureとthreadとは、互いに生成済みスレッドを再利用できません。ちょっとした差ではありますが、効率的ではないことは知っておいて損はないでしょう。

core.asyncを使う人は、おおむね、スレッド処理はcore.asyncばかり使う傾向があるので、今後はfutureの代わりにthreadを使うことにすれば落着、と行きそうですが、両者は機能にも違いがあるのでなかなかそうは行きません。

機能の違い

  • send, send-off (agent系)
  • future
  • go, thread (core.async系)

この3種類は用途および使い方が違います。
sendとsend-off、goとthreadは、用途は同じですがCPUバウンドかIOバウンドかが異なります。
futureはどちらにも属しません。

sendとsend-offはどちらも、agent操作関数であり、目的はあくまでagentの実行と更新です。そもそもagentは、汎用的な並行処理起動のためにあるものではなく、かなり特殊な用途でつかうものなので、「ただスレッドを起動したい」だけでは使わないほうがいいです。

agentの特徴は、同じagentで起動した処理は「逐次実行される」点です。同じagentに何回もsend, send-offしても、それらが平行で処理されるわけではありません。sendやsend-offはagentのアクション実行キューにアクションを積むだけです(もちろん、複数のagentが存在すれば、それらは平行に動きます)。そもそもagentは「値」を持っていて、sendやsend-offで積んだアクションによって、agentの結果値が順番に変わっていく、というものだからです。

goとthreadはagentに比べてより汎用的な並行処理機構で、goやthreadブロックの処理は、スレッドプールの違いはあれ、すぐにスレッドに割り当てられて平行に動きます。いずれのマクロも、処理完了時の結果値が取り出せるチャネルを返します。とこれだけ書くと、threadはfutureと似ているように思えます。ともにIOバウンドな処理用で、結果値を取得できるオブジェクトを返します。futureを卒業して、core.asyncに「移行」すべきでしょうか?

futureは、処理をキャッシュ化スレッドプールに渡してくれる点でthreadと同じですが、futureはdelayオブジェクトでもある点が大きく異なります。

(let [result1 (future (my-remote-func1 ...))
       result2 (future (my-remote-func2 ...))]
  (my-long-processing-fn)
  {:age (-> (:base @result1) (+ 20))
   :address (str (:address @result1) " " (:address @result2))
   :name (:name @result2)})

futureはderef(の省略記号アットマーク)によって非同期処理の実行結果を取得できますが、derefは何回でも使えます。最初のderef時にまだ処理が終わってない場合は処理完了を待機しますが、以降は、キャッシュした結果値を返し続けます。

上記例では、my-remote-func1とmy-remote-func2というリモート呼び出しを平行化するためにfutureを使い、さらにmy-long-processing-fnという長い処理を行う関数を呼びました。my-long-processing-fn実行中も、別スレッドでリモートコールは実行されています。
最後にマップを作るときに、futureの結果値を参照していますが、result1もresult2も、2回参照している点に注目してください。

threadマクロはdelayオブジェクトではなく、チャネルを返します。(<!! ch) によって結果値を取り出せますが、derefと違って、<!!を繰り返し読んでも同じ結果が返ってくるわけではありません。チャネルはキューの一種で、チャネルへの <!! は呼ぶたびに新しい値を返し、値がなくなるとnilを返すので、チャネルを、delayのように繰り返し参照すべきではありません。

(let [ch1 (thread (my-remote-func1 ...))
       ch2 (thread (my-remote-func2 ...))]
  (my-long-processing-fn)
  (let [result1 (<!! ch1)
        result2 (<!! ch2)]
      {:age (-> (:base result1) (+ 20))
       :address (str (:address result1) " " (:address result2))
       :name (:name result2)}))

チャネルベースのthreadを、futureの代用として使う場合は、letを使ってチャネルからいったん値を取り出さなければいけない点で、使い勝手が異なってきます。

もちろん、ごく僅かな差ですし、go/threadには、複数のgo/threadブロックが共通の(しかもたくさんの)チャネルを介して値をやり取りしつつ並行処理を実行するという本来の目的がありますから、価値はいささかも減じません。ここで言いたいのは、threadはgoのIOバウンド版であって、全並行処理をcore.async化しようとして、futureのかわりにthreadを使おうというのは、アリではありますが、若干短絡的です。
core.asyncのパワーは、goあるいはthreadブロックが複数個起動していて、互いに(チャネルを介して)通信しあう時に発揮されます。もちろん、常に結果チャネルを返す点で汎用的なスレッド起動の仕組みとして使うことも配慮されていますが、上記のような違いを意識しておいたほうがよいでしょう。この例のように、複数のfutureでいくつもの並列処理が起動して、あとでその結果値を使う場合、threadの場合は、長いlet式でいったんチャネルをリードする必要があるかもしれません。

一方で、futureとthreadは使用するスレッドプールが異なるので、併用すると、互いにスレッドを共有してくれません。future同士はスレッドを共有しますし、thread同士も共有しますが、futureとthreadは共有しません。ここに若干のロスが存在します。

よって、用途に合わせてfutureとthreadと使い分けるか、あるいはスレッドプールの効率性を考えて片方に寄せるか(パワーを考えるとthreadの方が強力なので、ふつうはthreadに寄せるでしょう)は、正直、好みの次第です。実を言うと、私はfutureを使うシーンでもthreadを使うことがほとんどです。好みの問題です。

まとめ

  • agentは単なる並列処理起動用の機能ではないので、ちょっと考えて使え
  • reducersはすごい量のデータを処理でもしない限り、並列化機構だと思うな。
  • いまやりたい処理がCPUバウンドかIOバウンドかはちゃんと考えろ
  • futureにはちゃんとfutureに向いた処理がある。けどあえてthreadで代用も出来る。その場合、他の並列処理もなるべくcore.asyncを使うようにすれば、スレッドプールのキャッシュ効率は若干良い。