- タスクのキャンセル
- 単純なキャンセル可能タスク
- スレッドのインタラプト
- Future によるキャンセル
- Future による実行時間の制限
- インタラプトできないブロッキング
- Socket の I/O ブロックに割り込みする
- まとめ
タスクのキャンセル
Java にはスレッドを強制的に停止する安全な方法は存在しません。
Thread.stop
や Thread.suspend
などはこの用途に適したものではありますが、Java非推奨スレッド・プリミティブ にある通り、これらは非推奨であり@Deprecated(since="1.2")
とマークされているため使うべきではありません。
あるタスクをキャンセルするには協力的な方法を取るしかありあせん。
つまり、タスクやタスクを実行しているスレッドにキャンセルを依頼し、タスクやタスクを実行しているスレッドが安全に終了するのを待つことしかできません。
単純なキャンセル可能タスク
単純なタスクであれば、フラグを経由してキャンセルを行うことができます。
public class CancelableTask implements Runnable { private volatile boolean cancelled; @Override public void run() { while (!cancelled) { System.out.println(this.toString()); } } public void cancel() { cancelled = true; } }
注意点としては、フラグは volatile
宣言しなければならない点です。 volatile
宣言を忘れた場合には、メモリの可視性に起因して、タスクが永遠にキャンセルされない可能性があります。
タスクはスレッドのオーナー側から以下のようにキャンセルを依頼することができます。
CancelableTask task = new CancelableTask(); new Thread(task).start(); // ... task.cancel();
この例では、単純に println()
しているだけですが、通常はより大きな(時間のかかる)処理を行うため、キャンセルを依頼したとしても直ちにキャンセルされるとは限りません。
加えて、処理の中で Socket I/O や BlockingQueue への追加といった、ブロックメソッドのコールが行われていた場合、キャンセルフラグのチェックに永遠に到達せずにキャンセルが行われない可能性があります。
スレッドのインタラプト
Thread
にはスレッドへの割り込み(interrupt)を扱うステータスがあります。キャンセル処理には通常この割り込みを使うのが妥当です。
スレッドのインスタンスに対して interrupt()
を呼び出すことで、Thread
の割り込みステータスを変更することができます。
このメソッドはスレッドのステータスを変更するだけで、それに対してどのように振る舞うかは実装で定義する必要があります。何も行わない ということも出来ます。
インタラプトは以下のように実現できます。
public class CancelableTaskThread extends Thread { @Override public void run() { try { while (!Thread.currentThread().isInterrupted()) { System.out.println(this.toString()); Thread.sleep(3000); } } catch (InterruptedException e) { System.out.println(e); Thread.currentThread().interrupt(); } } public void cancel() { interrupt(); } }
処理の先頭で isInterrupted()
で割り込みのステータスをチェックしています。この時点で interrupt()
されていた場合は処理がキャンセルされたものとして早期脱出します。
Thread.sleep()
のようなブロックメソッドは概ね以下のようなチェックを行っています(Thread.sleep()
は native メソッドですが概念的には同じです)。
if (Thread.interrupted()) throw new InterruptedException();
Thread.interrupted()
は現在の割り込みステータスを返すとともに、割り込みステータスをクリアします。 InterruptedException
がブロックメソッド側で発生するためブロックメソッドの処理途中でキャンセルとして扱うことができます。
InterruptedException
を catch した場合、そのまま例外をスローするか、割り込みステータスを設定して呼び出し側でそれを適切に処理できるようにすることができます(前述の通りInterruptedException
がスローされた時点で割り込みステータスはクリアされるため)。
ここでの例では例外を catch して握りつぶす代わりに、割り込みステータスを復元しています。
Future によるキャンセル
Java 1.5 からは、タスクのキャンセルは Future
を使います。
Callable<Void> task = new Callable<>() { @Override public Void call() throws Exception { while (true) { if (Thread.interrupted()) throw new InterruptedException(); System.out.println(this.toString()); Thread.sleep(3000); } } }; ExecutorService executor = Executors.newSingleThreadExecutor(); Future<Void> future = executor.submit(task); // ... future.cancel(true);
ExecutorService
に submit
することで Future
が得られるため、タスクのキャンセルは Future.cancel()
で行います。
Future.cancel()
は引数にブーリアンの mayInterruptIfRunning
を受け取ります。これが true
の場合は、どこかのスレッドで実行中のタスクがインタラプトされます。false
とした場合は、タスクがスタートしていない場合に限りタスクの実行がキャンセルされます。
なお上記例では Callable
を使っていますが、InterruptedException
をスローする必要がなければ(catch
して割り込みステータスを復元するなどを行うなど) Runnable
を使うこともできます( Runnable
はチェック例外を投げることができない )。
Future による実行時間の制限
Future
を使えばタイムアウトが必要な処理も簡単に実現できます。
public class TimedRun { private static final ExecutorService executor = Executors.newCachedThreadPool(); public static void submit( Runnable r, long timeout, TimeUnit unit) throws InterruptedException { Future<?> task = executor.submit(r); try { task.get(timeout, unit); } catch (TimeoutException e) { // logger.warning("Timeout:" + e.getMessage()); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } finally { task.cancel(true); } } private static RuntimeException launderThrowable(Throwable t) { if (t instanceof RuntimeException) { return (RuntimeException) t; } else if (t instanceof Error) { throw (Error) t; } else { throw new IllegalStateException("Not unchecked", t); } } }
Future.get(long timeout, TimeUnit unit)
で結果取得のタイムアウトを指定できます。指定時間内に結果取得が出来ない場合には TimeoutException
がとなり finally
にて処理がキャンセルされます(処理が時間内に完了していたとしても cancel
の呼び出しは無害です )。
ExecutionException
は処理の過程で発生したアプリケーション例外となるため、getCause()
にて取り出した結果を再度スローしています。
インタラプトできないブロッキング
Java API のブロック処理は、たいていの場合インタラプトに応じて、InterruptedException
をスローしますが、ソケットの同期I/Oなど、インタラプトに応答しないものも存在します。
インタラプトに応答しないブロックをキャンセル処理するには、ブロックしている原因に応じた操作が必要になります。
ブロックするメソッドは概ね、 JavaDoc に割り込み方法が明記されています。
その一例を以下に示します。
操作 | インタラプト方法 |
---|---|
java.io のソケット同期I/O | ソケットをクローズすれば read/write でブロックしているスレットが SocketException をスローする |
java.nio の同期I/O | InterruptibleChannel インターフェースを実装したチャネル上でブロックされたスレッドがある場合にはブロックされたスレッドの interrupt を呼び出すことでチャネルがクローズし、ブロックされたスレッドが ClosedByInterruptException を受け取るこのチャネルの close メソッドを呼び出すことで、ブロックされたスレットが AsynchronousCloseException を受け取る |
Selector による非同期I/O | select() によってブロックされたスレッドは、セレクタの wakeup メソッドの呼び出しか、セレクタの close メソッドの呼び出し、またはブロックされたスレッドの interrupt の呼び出しにより他のスレッドからの割り込みを受け付ける。 |
synchronized による固有ロック | synchronized でブロックしている場合は、このスレッドをキャンセルすることはできない。明示的にロックを行う Lock クラスのlockInterruptibly を使えばロックを待機しながらインタラプトに応答することができる。 |
一例として、Socket
のI/O を扱う以下のタスクは割り込みに応答せず、ブロックから抜け出すことができません。
ServerSocket serverSocket = new ServerSocket(9090); Callable<Integer> task = () -> { Socket socket = serverSocket.accept(); return socket.getInputStream().read(); }; ExecutorService executor = Executors.newSingleThreadExecutor(); Future<?> future = executor.submit(task); // ... Socket client = new Socket("localhost", 9090); // ... future.cancel(true);
serverSocket
はクライアントからの接続があると、serverSocket.accept()
から Socket
を返します。
その後、socket.getInputStream().read()
でブロックすると、割り込みに応答することはありません。
以下で Socket
のI/Oを割り込みに応答させる例を示します。
Socket の I/O ブロックに割り込みする
前述したように、Socket
の同期I/Oはインタラプトに応答しません。I/O 待ちとなっているスレッドへインタラプトするには Socket
をクローズする必要があります。
AbstractExecutorService
には以下のような newTaskFor()
という protected なメソッドがあり、このメソッドはExecutorService.submit()
時に返却される Future
のファクトリメソッドになっています。
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
ExecutorService
を継承し、このメソッドをオーバーライドして独自の割り込み処理を定義した Future
を生成することができます。
特別なキャンセル処理可能なタスクを表すインターフェースを以下のように作成します。
public interface CancellableTask<T> extends Callable<T> { void cancel(); RunnableFuture<T> newTask(); }
作成したインターフェースの実装を以下のように作成します。
public abstract class SocketUsingTask<T> implements CancellableTask<T> { private Socket socket; protected synchronized void setSocket(Socket s) { socket = s; } public synchronized void cancel() { try { if (socket != null) socket.close(); } catch (IOException ignored) { } } public RunnableFuture<T> newTask() { return new FutureTask<>(this) { @Override public boolean cancel(boolean mayInterruptIfRunning) { try { SocketUsingTask.this.cancel(); } finally { return super.cancel(mayInterruptIfRunning); } } }; } }
キャンセルにてソケットをクローズするように cancel()
を実装しています。
そして newTask()
で新しい Furure
を生成する際に、Future.cancel()
の前処理として、ソケットをクローズする close()
を呼び出しています。その後、finally
にて通常の Future.cancel()
を処理します。
最後に Executor
の newTaskFor()
をオーバライドして上で定義した Furure
のファクトリを呼ぶようにします。
public class CancellingExecutor extends ThreadPoolExecutor { public CancellingExecutor() { super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); } // ... @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { if (callable instanceof CancellableTask) { return ((CancellableTask<T>) callable).newTask(); } else { return super.newTaskFor(callable); } } }
これで、Future.cancel()
の割り込みに応答することができます。
以下のようにすれば、future.cancel()
にてブロックしていたストリームからの読み込みが SocketException
で脱出し、割り込みに応じたキャンセルが実現できます。
ServerSocket serverSocket = new ServerSocket(9090); CancellableTask<Integer> task = new SocketUsingTask<>() { @Override public Integer call() throws Exception { try { Socket socket = serverSocket.accept(); setSocket(socket); return socket.getInputStream().read(); } catch (Exception e) { e.printStackTrace(); throw e; } } }; ExecutorService executor = new CancellingExecutor(); Future<?> future = executor.submit(task); // ... Socket client = new Socket("localhost", 9090); // ... future.cancel(true);
まとめ
- Java にはスレッドを安全に強制終了する仕組みがなく、割り込み要求による協力的な方法を取る必要がある
- タスクのキャンセルは
ExecutorService
を経由して得たFuture
で割り込みを使うのがベター - ブロックするメソッド全てが割り込みに応答するとは限らず、特殊なキャンセルが必要となるケースがある