Java 7 NIO2 Asynchronous系の使い方

検索したけどあんまりちゃんとした情報を誰も載せてない気がしたので、
忘れないうちにまとめておきます。

まずはシンプルなServerの作り方。

System.out.println("Start");

try (AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();) {
	server.bind(new InetSocketAddress(12345));
	Future<AsynchronousSocketChannel> sock = server.accept();

	try (AsynchronousSocketChannel ch = sock.get();) {
		ByteBuffer buf = ByteBuffer.allocate(1024);
		Future<Integer> result = ch.read(buf);
		buf.flip();
		Integer i = result.get();
		System.out.println("Read :"+i);
		System.out.println(Charset.defaultCharset().decode(buf));
	}
} catch (IOException e) {
	e.printStackTrace();
} catch (ExecutionException e) {
	e.printStackTrace();
} catch (InterruptedException e) {
	e.printStackTrace();
}

普通のServerSocketと違うところはacceptの返り値がFuture型なところ。
acceptメソッドではブロックしないで、Future型を返す。
なので、Asynchronousの名前が付いているということ、でしょう。
このプログラムではとりあえずsock.get()を呼び出してそこでブロック。
getメソッドにはタイムアウトを指定できるのでスレッドを他の処理に回したい人はタイムアウトさせてください。
もっとも、そんな使い方するのならそもそもこのFuture型を使った実装は向いていない気がするので後述する方法を使った方がすっきりします。


で、get()の返り値としてAsynchronousSocketChannelが返って来るので、それを使ってクライアントと通信。
readメソッドを呼び出すと同様にFuture型が返って来るので同じようにgetメソッドの呼び出しでブロック。
後はsysoして終わり。
ブラウザ等でlocalhost:12345にアクセスするとHTTPのリクエストが見えるはずです。
この実装ではそれを表示して終わり。サーバー的に使うのであればWhileループさせればいいんだけど、
誰かのをReadしている間はAcceptできないので、Read部分はスレッドにするのが普通です。


ちなみに、SocketのOptionを指定するにはJava 7から下記のような方法がサポートされました。

server.setOption(StandardSocketOptions.SO_REUSEADDR, true);

setOptionメソッドではSocketOptionと、そのT型の引数を受け取ります。
普通のOptionはStandardSocketOptionsに入ってるので事足ります。
SO_REUSEADDRの場合はBooleanのGenericsが指定されているので、第2引数にBoolean以外を入れるとエラーになります。
プログラム中でいろんな引数を指定したいときはこの書き方が便利ですね。


このFutureを使った呼び出し方は、これまでの実装に配慮して、簡単に置換できるように用意されたんではないかなぁ、と。
Java 7でこれをやる意味はあんまり無いと思います。
ということで、次はCompletionHandlerを使ったサーバー。

public class AsyncServer {
	
	public class SimpleHandler implements CompletionHandler<AsynchronousSocketChannel, Integer> {
		@Override
		public void completed(AsynchronousSocketChannel result, Integer attachment) {
			server.accept(0, handler);
			ByteBuffer buf = ByteBuffer.allocate(1024);
			Future<Integer> f = result.read(buf);
			try {
				int i = f.get();
				buf.flip();
				System.out.println(Charset.defaultCharset().decode(buf));
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		
		@Override
		public void failed(Throwable exc, Integer attachment) {
			
		}
	}
	
	private final AsynchronousServerSocketChannel server;
	private final SimpleHandler handler;
	
	public AsyncServer() throws IOException {
		server = AsynchronousServerSocketChannel.open();
		handler = new SimpleHandler();
	}
	
	public void start(){
		try {
			server.bind(new InetSocketAddress(12345));
			server.accept(0, handler);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) throws IOException {
		AsyncServer server = new AsyncServer();
		server.start();
		
		while(true);
	}
}

簡単に言うと、acceptするときに成功した場合と失敗した場合の処理を登録しておくのがCompletionHandler型を使った方法です。
最初に宣言されているインナクラスはaccept時に使うCompletionHandlerを実装しています。
Genericsの最初はさっきのFutureでいうところの返り値、二つ目は添付するObjectの型です。(Selectorを使ったときのattachと同じ使い方)
completedメソッドが成功したときに実行されるメソッド、failedが失敗したときに実行されるメソッドです。
completedメソッドは次のhandlerを登録しておいて、Acceptしたときの処理に進みます。
この例ではFuture型を使って最初の例と同様に表示して終わり。(Closeしてないけど)
当然、このreadメソッドもCompletionHandlerを使って書けます。

実行するには、acceptするときにattachするオブジェクトとCompletionHandlerを指定すれば、
後は勝手に指定したポートでacceptしたときにHandlerが呼ばれてスレッドによる処理が行われます。
このようにCompletionHandlerを使えば、acceptからreadまでブロックすることなく割と理解しやすい形で書けるのがAsynchronous系の特徴かもしれません。
ここで注意なのが、あくまでacceptしたときの処理を記述しただけなので、他に何も動いていなければJavaはプログラムを終了します。
JavaのGUIなり他の監視スレッドなりが動いている必要があるので、このプログラムでは最後にwhile(true)とかしてます。


ちなみに、もっと直感的に書きたい人は匿名インナクラスとか使って下記のように書くのが普通かな、と。

	public static void main(String[] args) throws IOException {
		final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
		server.bind(new InetSocketAddress(12345));
		server.accept(0, new CompletionHandler<AsynchronousSocketChannel, Integer>() {
			@Override
			public void completed(AsynchronousSocketChannel result, Integer attachment) {
				// completed
				server.accept(0, this);
			}
			
			@Override
			public void failed(Throwable exc, Integer attachment) {
				// failed
			}
		});
		
		while(true);
	}

とてもシンプルですね!
匿名インナクラス内で使いたいメンバにはfinalを付けるか、ちゃんと指定可能なようにしましょう。


CompletionHandlerを使うときに一つ注意しないといけないのは
CompletionHandlerの中身はマルチスレッドによる処理がなされるところです。
つまり、メソッドはスレッドセーフに書かないといけません。
仮にacceptした数をカウントしようとしてこんなソースを書いたらダメです。

	public class SimpleHandler implements CompletionHandler<AsynchronousSocketChannel, Integer> {
		private int i = 0;

		@Override
		public void completed(AsynchronousSocketChannel result, Integer attachment) {
			server.accept(0, handler);
			i++;

i++の実行はスレッドセーフではないので、複数のスレッドが走る可能性がある部分に書いてはいけません。
ついでにiの宣言にfinalもvolatileも付いてないので全然ダメです。
カウントしたい人はAtomicIntegerを使いましょう。


最後に、プログラム中でExecutorServiceを既に使っていて、スレッドをちゃんと管理したいという人はChannelを開くときにGroupを指定できます。

		ExecutorService service = Executors.newCachedThreadPool();
		AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(service);
		AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);

AsynchronousChannelGroupの作り方は他にもいろいろあるのでAPIを参考にしてください。


以上でAsynchronous系の使い方はだいたい終わりです。
個人的にはSelectorを使う方法よりかなり使いやすくなったなぁ、と。
この例ではacceptとreadだけだけど、当然writeやconnectもAsynchronousに実行できます。
Selector使うとその辺りまで厳密にNonBlockingに書くのはめんどくさかったので、結構助かります。


ただ、問題は、「Asynchronous」ってのが長すぎる!
Eclipse前提だとしても長すぎる。上記のgroup指定するコードなんてぱっと見がカオス状態。
Asynchronous → Asyncにするだけでかなり違うのになぁ。
まぁ、Javaに今更そんなこと言っても仕方ないですね。