こんにちは。アキバです。
ゴールデンウィークですね!
皆さんいかがお過ごしですか?
今年は間に平日が多めなので、大型連休!というよりは2回連休があるというイメージの方が強いかもしれません。
cero-tの奥さんは11連休だとか
さて、前回に続いて、ParallelStreamで動かしているラムダ内で、例外が発生した場合の挙動について調べていきます。
まずは、軽くおさらいします。
以下のようなコードを書きました。
try { List<String> strArray = Arrays.asList("abc", "def", "xxx", "ghi", "jkl", "xxx", "pqr", "stu"); strArray.parallelStream().forEach(s -> { System.out.println("ラムダ開始: id=" + Thread.currentThread().getId()); try { Thread.sleep(100L); if (s.equals("xxx")) throw new RuntimeException("ラムダ内で例外: id=" + Thread.currentThread().getId()); } catch (RuntimeException ex) { System.out.println("ラムダ内で例外発生: id=" + Thread.currentThread().getId()); throw ex; } catch (InterruptedException e) { e.printStackTrace(System.out); } System.out.println("ラムダ終了: id=" + Thread.currentThread().getId()); }); } catch (Exception th) { System.out.println("外側で例外をcatch"); th.printStackTrace(System.out); }
そうすると、こんな感じでいくつかのスレッドが終了しない時がありました。
ラムダ開始: id=1 ラムダ開始: id=14 ラムダ開始: id=15 ラムダ開始: id=13 ラムダ開始: id=16 ラムダ開始: id=17 ラムダ開始: id=12 ラムダ開始: id=18 ラムダ終了: id=16 ラムダ終了: id=15 ラムダ内で例外発生: id=13 ラムダ内で例外発生: id=1 ラムダ終了: id=14 ラムダ終了: id=17 ラムダ終了: id=12 外側で例外をcatch java.lang.RuntimeException: ラムダ内で例外: id=13
それから、なぜかラムダ内では2回例外が発生するようにしているのに、ラムダの外側では1つしかcatchできていません。
どうしてなんでしょうか?
というお話でした。
1. 例外のスタックトレースをのぞいてみる
さて、ラムダの中でcatchした例外(2つ)と、ラムダの外側でcatchできた例外(1つ)の違いを見てみましょう。
以下のようにコードを修正します。
try { List<String> strArray = Arrays.asList("abc", "def", "xxx", "ghi", "jkl", "xxx", "pqr", "stu"); strArray.parallelStream().forEach(s -> { System.out.println("ラムダ開始: id=" + Thread.currentThread().getId()); try { Thread.sleep(100L); if (s.equals("xxx")) throw new RuntimeException("ラムダ内で例外: id=" + Thread.currentThread().getId() + ", s=" + s); } catch (RuntimeException ex) { System.out.println("ラムダ内で例外発生: id=" + Thread.currentThread().getId() + ", s=" + s); // スタックトレースを出力させてみる(ラムダの中でcatchした例外) ex.printStackTrace(System.out); throw ex; } catch (InterruptedException e) { e.printStackTrace(System.out); } System.out.println("ラムダ終了: id=" + Thread.currentThread().getId() + ", s=" + s); }); } catch (Exception ex) { System.out.println("外側で例外をcatch"); // スタックトレースを出力させてみる(ラムダの外でcatchした例外) ex.printStackTrace(System.out); System.out.println("ラムダの外でcatchした例外:ここまで"); }
これを動かすと、ラムダの中で2つ、ラムダの外で1つの例外がcatchできます。
そして、ラムダの外でcatchした例外は、ラムダの中で発生した例外のどちらかになるわけです。
実行した結果は、こうなりました。
ラムダ開始: id=1 ラムダ開始: id=14 ラムダ開始: id=15 ラムダ開始: id=16 ラムダ開始: id=13 ラムダ開始: id=12 ラムダ開始: id=17 ラムダ開始: id=18 ラムダ終了: id=12, s=stu ラムダ終了: id=17, s=jkl ラムダ終了: id=14, s=pqr ラムダ終了: id=16, s=ghi ラムダ終了: id=15, s=def ラムダ終了: id=18, s=abc ラムダ内で例外発生: id=1, s=xxx ラムダ内で例外発生: id=13, s=xxx java.lang.RuntimeException: ラムダ内で例外: id=13, s=xxx at StreamSample.lambda$parallelStreamExceptionSample2$2(StreamSample.java:121) at StreamSample$$Lambda$1/1555009629.accept(Unknown Source) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ・・・略・・・ at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:902) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1689) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1644) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) java.lang.RuntimeException: ラムダ内で例外: id=1, s=xxx at StreamSample.lambda$parallelStreamExceptionSample2$2(StreamSample.java:121) at StreamSample$$Lambda$1/1555009629.accept(Unknown Source) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ・・・略・・・ at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:400) at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:728) at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) at StreamSample.parallelStreamExceptionSample2(StreamSample.java:117) at StreamSample.main(StreamSample.java:58) 外側で例外をcatch java.lang.RuntimeException: ラムダ内で例外: id=13, s=xxx at StreamSample.lambda$parallelStreamExceptionSample2$2(StreamSample.java:121) at StreamSample$$Lambda$1/1555009629.accept(Unknown Source) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) ・・・略・・・ at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:902) at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1689) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1644) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ラムダの外でcatchした例外:ここまで finished.
上記の例では、Parallelで動かした処理の1つはアプリケーションのmainスレッドでした。
もう一つのスレッドは、ForkJoinTaskのワーカースレッドとして起動しています。
そうです。ParallelStreamは、Fork/Join Frameworkで動いているんです。
Fork/Join Framework、というかForkJoinTaskの実行は、1つのmainスレッドと複数のワーカースレッドで行われます。
この辺の仕様はForkJoinTaskクラスのJavadocにも書かれているので、なにやら小難しいですが読んでみると参考になるかもしれません。
上記の例ではmainスレッドで実行中のTaskで例外が発生しましたが、当然、ワーカースレッドでだけ例外が発生する場合もあります。
(むしろ、その方が普通かもしれませんね)
実際に、上記のコードで"xxx"の位置を変えてやると、例外が発生するスレッドが変わります。
つまり、例外が発生したスレッドがメインかワーカーかに依存してはいけないということになります。
Fork/Joinの実行は、mainスレッドが各Taskの終了状態を見ていて、いずれかのスレッドで例外が発生すると、外側に再スロー(rethrow)する仕組みになっています。
なので、最初に発生した例外だけがラムダの外側でcatchできるということになるのです。
2. 例外が発生したワーカースレッドは、いつ終了するのか?
ラムダ内で例外が発生して外側でcatchした後も、他のワーカースレッドは動き続けています。
例外が発生した場合、mainスレッドとしてはラムダの実行は終わっているのですが、それをワーカースレッドが知ることはできないわけですね。
この動作は、例外が発生するスレッドがmainかワーカーかによらず、同じになります。
3. "ラムダ終了"が出なかったワケ
ForkJoinTaskで動くワーカースレッドは、daemonスレッドです。
要は、daemonスレッド以外のスレッドが全て終了すると、プロセスが終了になるということです。
前回、"ラムダ終了"が出なかったスレッドがあると書いたのは、この辺にカラクリがあります。
ForkJoinで実行中のタスクの1つで例外が発生する
↓
mainスレッドが外側に例外を再スローする
↓
mainスレッドが先に終了する
↓
他のForkJoinスレッドはdaemonスレッドで動いているため、実行中でもJavaVMが終了
↓
メッセージ出ない(><
となるわけです。
4. まとめると
これまでの内容をまとめると、ラムダ内で例外が発生した場合の動作は以下の3点になるかと思います。
- 1. 最初に発生した例外だけ、ラムダの外側でcatchすることが出来ます
- 2. ラムダ内で例外をスローすると、全スレッドの終了を待つことはできません
- 3. ワーカーはdaemonスレッドなので、mainスレッドが終了すると、途中でもワーカーの処理は終了します
上の動作から注意すべきなのは、
他のスレッドはラムダの例外終了を検知できないので、異常時に全ロールバックみたいな処理をParallelStreamでやってはいけない
ということでしょうか。
Webアプリケーションなど、処理が終了してもプロセスが終了しないようなサーバーでは、
どちらかというと、ラムダ内で例外は発生させないか、発生しても外側に投げない仕組みにした方がよいでしょう。
いかがでしたか?
また面白そうなネタがあれば調べてみます。
それでは!
Acroquest Technologyでは、キャリア採用を行っています。
- 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
- 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
- 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
- OSSの開発に携わりたい。
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
キャリア採用ページ