かとじゅんの技術日誌

技術の話をするところ

Executorを使おう

Java5からスレッドプールが利用できるようになりました。

スレッド作成の欠点

これまでのスレッドは、ThreadクラスやRunnableクラスを組み合わせ使っていました。しかし、スレッドをただ単に使うだけだと以下のような欠点があります。

  • スレッドを生成から破棄までのライフサイクルで発生するコスト

一般的にはスレッド作成はOSやJVMに負荷が発生します。リクエストが頻繁で軽い場合は相当量のリソースを消費してしまいます。

  • リソースの消費

アクティブなスレッドはメモリを消費します。実行状態のスレッドが利用可能なCPUの数より多いとスレッドが停滞します。さらにアイドル状態のスレッドが多くなるとメモリ消費量が増えて、ガベージコレクタの負担が増大。多くのスレッドがCPUの時間を奪い合うレースコンディションになり性能も低下します。このようにCPUの数以上スレッドが存在しても意味がありません。

  • 安定性が考慮されていない

作成できるスレッドの上限の数はあらかじめ決められています。しかし、それを無視して作成した場合OutOfMemoryErrorが発生することになります。上限を超えないような仕組みが必要になります。

というわけで、これらの解として Executor フレームワークです

java.util.concurrentパッケージで提供されるExecutorフレームワークを使えばアプリケーションが過負荷でメモリ不足に陥らないようになります。
基本的にはスレッドプールを作成しその範囲内でスレッドを管理することで、最適なスループットと資源管理を実現します。

以下のコードをご覧ください。

final ExecutorService e = Executors.newFixedThreadPool(2);の部分でスレッドプールサイズ2で作成しています。この場合同時2個のタスクしか処理しません。タスクはe.executeで与えます。最初に10個登録していますが、プールが2個までですので最初の2個から順番に処理されます。プールサイズを10個にした場合はすべてのタスクが並行処理されます。
タスクのキャンセルですが、e.shutdownもしくはe.shutdownNowがあります。e.shutdownは、実行中のタスクを止めずに残りの実行されていないタスクをキャンセルします。この場合でも実行中のタスクを停止させたい場合は、e.isShutdownを参照するとよいでしょう。ブロッキングするAPIを使っている場合はe.shutdownNowを使わないとキャンセルできません。この場合はインタラプトを発生させるので、タスク側でInterruptedExceptionをハンドリングできるようにする必要があります。InterruptedExceptionが発生したらループを抜けるなどのコードを実装するだけでキャンセルに対応できるようになります。
TimeUnit.SECONDS.sleep(5);でブロックしていても、e.shutdownNowが呼ばれるとInterruptedExceptionが発生しタスクはキャンセルします。

public class Test {
	public static void main(String arg[]) {
		final ExecutorService e = Executors.newFixedThreadPool(2);

		for (int i = 0; i < 10; i++) {
			e.execute(new Runnable() {
				public void run() {
					handleRequest(e);
				}
			});
		}

		try {
			TimeUnit.SECONDS.sleep(1);
		} catch (InterruptedException e1) {
		}

		e.shutdownNow();
		try {
			e.awaitTermination(10, TimeUnit.SECONDS);
		} catch (InterruptedException e1) {

		}
	}

	private static void handleRequest(ExecutorService executorService) {
		int coutner = 0;
		try {
			while (!executorService.isShutdown()) {
				TimeUnit.SECONDS.sleep(5);
				System.out.println(Thread.currentThread().getId()
						+ " : counter = " + coutner++);
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

S2Chronosのジョブクラスでも、このインタラプションサポートしますのでこの仕組みをそのまま利用できます。
インタラプションが発生したかどうかのフラグを調べたい場合は、Thread.currentThread().isInterrupted()を呼び出して確認できます。

public class ExampleJob {
<snip>
	@Group("groupA")
	@Next("jobB")
	@Join(JoinType.NoWait)
	public void doJobA() {
		try {
			for (int i = 1; i < 5; i++) {
				TimeUnit.SECONDS.sleep(1);
				log.info("doJobA");
			}
		} catch (InterruptedException e) {
		}
	}
<snip>
}

参考書籍は

Java並行処理プログラミング ―その「基盤」と「最新API」を究める―

Java並行処理プログラミング ―その「基盤」と「最新API」を究める―