日本一わかりやすい Akka の IOManager と SocketHandle(やServerHandle)、それに IO.Iteratee と IO.IterateeRef

ちょっとこのあたりが複雑に感じたので、一度整理しておく。まぁ全部「公式読めば書いてあるよ」で OK なんだけど、それではあまりにマッチョすぎるしわたしがあとから参照したいしみたいな感じ。

IOManager とはなにか

Akka でIOを扱うときには、 IOManager を通じて行うと良い。実際の IO の煩わしいことを隠蔽してくれる。IOManager は IOManagerActor というアクターへのリファレンスであり、実際にActしてくれるのは IOManagerActor のようだ。IOManager は手動で止めたりする必要はなく、管理する IO が無くなれば勝手に止まってくれるらしい。賢い。

IOManager を通じてソケットを作るには、IOManager#connect や IOManager#listen を呼び出す。それぞれ SocketHandle や ServerHandle を返す。また、このとき同時に「作られた socket に対する IO の結果をどのアクターにメッセージを送るか」を指定してやることができる。例としてはこんな感じ

val serverActor = context.sysetm.actorOf[Props[ServerActor]]
val serversocket:ServerHanlde = IOManager(context.system).listen("localhost", port)(serverActor)

こんな感じでサーバーソケットを作ると、serverActor に対して各種イベント(新しいクライアントが接続してきたよ、とか)がメッセージで送られてくるようになる。

こんな感じ。

class ServerActor extends Actor {
  def receive = {
    // 新しくクライアントが接続してきたらこのメッセージが受信される
    case IO.NewClient(serverSocket: ServerHandle) =>
      // ここでserverSocketに対してなんかする。典型的な処理はacceptだと思う
  }
}

かんたんにまとめると、IOManagerを通じてソケットを作ると、そのソケットに対する直接の操作は IOManagerActor がやってくれて、わたしたちは自分で作ったアクターでその結果を受信して種々の処理を行えばよい、ということになる。IOManagerActor からのメッセージには ServerHandle や SocketHandle 、あるいは「ソケットからデータが読み込まれたよ」というイベントの場合はそのデータの内容などが入っているので、それに対して accept やら write やらの処理を行えば良い。

詳しくは 公式のリファレンス を見ると良いと思う。

では ServerHanlde/SocketHandle とはなにか

普通のネットワークプログラミングに慣れていると、この ServerHandle/SocketHandle ってのはソケットそのものか、あるいはそれを wrap したものなのかな、と思ってしまうのだけれど、実はこれはそうではなくて、ほんどうにただの「ハンドル」である。たとえば、SocketHandle を通じてソケットに何かを書き込む例を見てみよう。

val byteString = ByteString("some string")
socketHandle.asWritable.write(byteString)

これでソケットに対して byteString の内容が書かれる。では socketHandle.asWritable.write() したときになにが起こっているのだろうか。じつは、このメソッドがやっていることは「ソケットにbyteStringを書き込む」という操作ではなくて、「IOManagerに対して、この socketHandle に紐づけられたチャンネルにbyteStringを書き込んでくれと頼む」ことである。つまり、socket への実際の書き込みは IOManager により行われることになるわけだ。

なんでこんなことになってるのかというと、おそらく、これでアプリケーションの開発者はソケットの管理(ソケットが閉じてるとか開いてるとか)を IOManager に任せることができるといのが一点。それと、ネットワーク越しにアクター同士で協調作業を行うときに、socket そのものはネットワーク越しに渡せないので、ネットワーク越しに「こいつにこれ書き込んでよ」というメッセージを送り、実際の IO は実際に socket を持っているマシンが行うということを実現するためにこのようになっている、というのが一点、なのではないかと思う(まだここはちゃんと調べてないので真偽不明、識者によりツッコミを期待)。

まとめると、ServerHandle や SocketHandle はソケットそのものではなく、そのソケットを管理しているアクターに対して「これ書き込んで」とか「acceptして」とかそういうメッセージを送ってくれるやつである。

同じく詳細は 公式のリファレンス を参照のこと。

じゃあ IO.Iteratee ってなんなの

IO.Iteratee が出てきていないけれど、それもそのはず、じつは IO.Iteratee は IOManager たちと一緒に使うと便利ではあるけれど、直接はなんの関係もない。IO.Itereatee というのは、「IOの結果に対してどういう操作をするかを先に定義しておいて、IO.Iterateeに対して実際にデータが流し込まれるまでその操作を遅延する仕組み」である。

REPLで実験してみよう。

scala> import akka.actor.IO
import akka.actor.IO

scala > import akka.util.ByteString


scala> val take4 = IO take(4)
res0: akka.actor.IO.Iteratee[akka.util.ByteString] = Next(<function1>)

scala> 

まずは import しておく。(とうぜんakkaがインストールされていることが前提ですよ)そして、そのあとに IO take(4) で簡単な Iteratee を作っている。この操作は「IOから4バイト読み込む」ではなく、「IOから4バイト読み込むという操作を定義したIO.Iterateeを作る」という操作であることに注意してほしい。実際にこの段階では読み込みはなされていない(実際、IO待ちでブロックせずに、すぐにプロンプトが返ってきている)。

では今度は、「この4バイト読み込むという操作のあと、それを文字列にしてプリントする」という操作を定義してみよう。

まずは byteString を受け取り、その内容を String にして println する関数を定義する。

val printBytes = {bytes:ByteString => println(bytes.decodeString("US-ASCII"))}
printBytes: akka.util.ByteString => Unit = <function1>

そしたら、take4 に対する map 操作を行う。IO.Iteratee#map(f: A => B) は、「"レシーバの IO.Iteratee の結果得られる値に対して f を呼び出す"という操作を定義したIO.Iteratee」を返す。ちょうどリストに対する map 操作が、「リストに格納されているに対して f を呼び出した結果をリストに格納して返す」のと同じような感じだ。

scala> val take4AndPrintThat = take4.map(printBytes)
take4AndPrintThat: akka.actor.IO.Iteratee[Unit] = Next(<function1>)

おなじことを下のように表現することもできる。

scala> val take4andPrintThat = for {
     | bytes <- IO.take(4)
     | string = bytes.decodeString("US-ASCII")
     | } yield println(string)
take4andPrintThat: akka.actor.IO.Iteratee[Unit] = Next(<function1>)

Scala の for はちょっと特殊なので慣れないと読みにくいかもしれないが、やってることは上でやった map と一緒である。

さて、これで、「4バイト読み込んで、その結果をprintする」という操作が定義されたIO.Iterateeが作り出せた。

では、このIO.Iterateeに対して、実際にデータを書き込んでいってみよう。書き込みは apply でできる。

scala> take4andPrintThat(IO.Chunk(ByteString("abcdefg")))
abcd
res0: (akka.actor.IO.Iteratee[Unit], akka.actor.IO.Input) = (Done(()),Chunk(ByteString(101, 102, 103)))

"abcdefg" を書き込んだ結果、4バイトが取り出されて表示された。IO.Iteratee は immutable なオブジェクトであるため、「残った3バイトを内部に保持する」みたいなことはしない。これだとちょっと使い勝手がわるい。ので、残ったバイトの管理などをやってくれるものでこの IO.Iteratee を一枚 wrap する。それが IO.IterateeRef だ。

IO.IterateeRef でもう何も煩わしいことはなくなる

ここでは IO.IterateeRefAsync というのを使うけれど、それの為には実行コンテキストが必要になるので、まずは下準備として、今回は import scala.concurrent.ExecutionContext.Implicits.global で暗黙の実行コンテキストを作っておく。Akkaと一緒に使う場合には明示的にakkaのsystem.contextを使うべきだろう。

scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global

さて、これで準備が整ったので、まずはIterateeRefを取得する。

scala> val irefAsync = IO.IterateeRef.async(take4andPrintThat)
irefAsync: akka.actor.IO.IterateeRefAsync[Unit] = akka.actor.IO$IterateeRefAsync@f57f347

明示的に実行コンテキストを指定したいときには IO.IterateeRef.async(take4andPrintThat)(context)とすれば良い。

ではこのirefAsyncに対してデータを書き込んで行ってみよう。

scala> irefAsync(IO.Chunk(ByteString("abcde")))

scala> abcd

take4andPrint で定義された操作が非同期で実行されたため、先にプロンプトが表示されたのが見て取れるかと思う。さて、今、take4の残り1バイト(e)はどこへ行っただろうか。irefAsyncの中に残っているはずである。試してみよう

scala> irefAsync(IO.Chunk(ByteString("fgh")))

scala> irefAsync(IO.Chunk(ByteString("fghg")))

scala> irefAsync(IO.Chunk(ByteString("fgha")))

!?!?何も表示されない!? それもそのはず、irefAsyncは内部に take4andPrintThat を持っているが、この操作はすでに行われてしまった。「次にどうするか」が定義されていないので、そのあとにどんどんデータが書き込まれても、なにも起こらない。これでは使い物にならない。

このような事態を避けるために、今度は、「永遠に同じ操作を繰り返すという操作を定義したIO.Iteratee」を作り出してみよう。

scala> val loopIteratee = IO.repeat(take4andPrintThat)
loopIteratee: akka.actor.IO.Iteratee[Unit] = Next(<function1>)

そしてこれを参照した IO.IterateeRef を作る。

scala> val loopIrefAsync = IO.IterateeRef.async(loopIteratee)
loopIrefAsync: akka.actor.IO.IterateeRefAsync[Unit] = akka.actor.IO$IterateeRefAsync@4f758781

ではこれに対して書き込みを行ってみよう。

scala> loopIrefAsync(IO.Chunk(ByteString("abcdef")))

scala> abcd


scala> loopIrefAsync(IO.Chunk(ByteString("ghij")))

scala> efgh


scala> loopIrefAsync(IO.Chunk(ByteString("k")))

scala> loopIrefAsync(IO.Chunk(ByteString("l")))

scala> ijkl

おおーーーー便利!

というわけで、まとめると

  • IO.Iteratee は「IOに対してどういう操作を行うか」が定義されたもの
  • IO.Iteratee に対してデータを書き込むと、あらかじめ定義された通りにそのデータに対して操作を行う
  • IO.Iteratee は immutable な object であるため、「のこったデータ」とか「足りなかったデータ」を内部に保持してくれたりはしない。
  • それだと「細かいチャンクをどんどん書き込んでいくぜ」みたいなときにめっちょ不便なので、普通は IO.IterateeRef に包んで使う
  • IO.IterateeRef は書き込まれたデータを保持してくれて、そのデータに対して順次内部の IO.Iteratee を適用していく
  • IO.IterateeRef は「一度使ったIO.Iterateeを何度も再利用」みたいなことはしないので、そういうことがしたければ、「この操作を何度も繰り返すぜ」とあらかじめ定義しておいたIO.Iteratee を用意してあげる必要がある

そんなところでしょうか。

実際の例

手前味噌だが、猫型チャットサーバー や前回前々回のエントリーがこれらの技術を組み合わせたミニマルな例となっているので、実例としてはそれを見ると良いと思う。

追記

@mather314 さんがこのようなことを教えてくれました。ありがとうございます。

泣いてないよ!!!!! もうすぐこれdeprecatedになるそうなので、新しく学ぶひとは下のやつ見たほうがいいかも!!!!!! 泣いてない!!!!!!

http://doc.akka.io/docs/akka/2.2-M3/scala/io.html#io-scala