Scala + akka で簡単なチャットサーバーを書いてみたので解説してみるよ

!!!CAUTION!!!

この記事で扱っている IO のインターフェイスは Akka 2.2 で すでに old-io 扱いとなり( http://doc.akka.io/docs/akka/2.2.0/scala/io-old.html )、Akka 2.3 からは削除されてしまっています。

今ならばこちらを参考にされたほうがいいでしょう。

I/O まわり: http://doc.akka.io/docs/akka/2.3.7/scala/io.html

TCPのハンドリング:http://doc.akka.io/docs/akka/2.3.7/scala/io-tcp.html

本文

akka というのは Scala で Actor モデルを実現するためのライブラリだと思えばいいっぽい。この記事を読むためには case class とパターンマッチとアクターモデルについての知識が最低限あるといいと思う。

いちど全ソースを貼る。

みっつのファイルからできている。まあ最初は Main から見て行くべきでしょう。

object Main {
  def main(args: Array[String]) {
    val actorSystem = ActorSystem.create
    actorSystem.actorOf(Props(new ChatServer(9876)))
  }
}

ActorSystem.create で、アクターが動き回るための空間を作り出している。で、その空間の中に、チャットサーバーの役割をするアクターを新しく作っている。ちなみにこのとき返されるのはActorRefというものであり、アクターそのものではない。アクターそのものへのポインタだと思えばほぼ間違いない。

では今作られた チャットサーバーアクターの実装を見る

class ChatServer(port: Int) extends Actor {
  val state = IO.IterateeRef.Map.async[IO.SocketHandle]()(context.dispatcher)
 
  override def preStart = IOManager(context.system).listen("localhost", port)
 
  def receive = {
    case ChatMessage(message) =>
      state.keys.foreach(_.asWritable.write(ByteString("message: " + message + "\r\n")))
 
    case IO.NewClient(serverSocket) =>
      val socket = serverSocket.accept()
      state(socket).flatMap(_ => ClientHandler.handleInput(self, socket))
 
    case IO.Read(socket: IO.SocketHandle, bytes) =>
      state(socket)(IO Chunk bytes)
 
    case IO.Closed(socket: IO.SocketHandle, cause) =>
      state(socket)(IO EOF)
      socket.close
      state -= socket
  }
}

preStart メソッドを override して、アクターを初期化している。その中で IOManager を使ってサーバーソケットを作って listen している。IOManager は低レイヤーの IO を隠蔽してくれてなんかすごい楽に IO 書けるやつ。アクターの中で IOManager(...).listen すると、このソケットに対してなにか書かれたり接続があったりしたときにこのアクターに対して IO.* というメッセージが送られてくるようになるようだ。ちなみに、IOManager(...).listen(...)(actorRef) という感じで、IO.* メッセージを受け取るアクターを明示的に指定することもできる。ではアクターはそのメッセージをどうやって受け取るか。

それを受け取るためのメソッドが receive メソッドである。実体はパターンマッチで、受け取ったメッセージの種類によってここで処理を分けている。今回は IO.* を受け取ったばあいの処理に、state という変数が登場していて、そこになにやら処理を行っている。

では state とはなにか。IO.IterateeRef.Map.async[IO.SocketHandle]()(context.dispatcher) である。これはIO.socketHandle をキーとしてUnitを値に持つ IO.IterateeRef.Map という型のコンテナである。これが結構くせ者で、値に Unit を持つの意味がまずわかりにくいのだが、とりあえずここでは「キーがIO.socketHandleの、なんかすごいコンテナである」ということだけ頭に入れておけばよい。

ちなみに、IO.IterateeRef.Map#apply(socket) は socketに紐づけられた IO.IteateeRef[Unit] を返す。新しいのが出てきた!IO.IterateeRef[Unit] ってなんだ!!!!!という感じだけど、まあ名前の通り IO.Iteratee[Unit] を参照するためのもののようだ。とはいえ、当然そうなると、じゃあ IO.Iteratee[Unit] ってなぁに? という話になってくる。

先にかんたんにイメージをしてもらうと、IO.Iteratee[T] というのは、「"なにか IO に対してデータが書き込まれたときに、そのデータをどうやって処理するのかを取り決めた式"を内部に持つコンテナ」だと思ってくれればだいたい間違いない。「データをどうやって扱うのかを決めた式」を内部に持っておいて、実際にデータを読み込んだりするのは遅延される、という意味ではLINQとかに似てると思う。IO.Iteratee[String]ならば、「"IOから送られてきたデータを文字列にして取得する式"を内部に持つコンテナ」だし、IO.Iteratee[List[String]]ならば、「"文字列のリストにして取得する式"を内部に持つコンテナ」、という具合だ。じゃあ IO.Iteratee[Unit] ってどういうことだよ……って感じだけど、それは後ほど見ることにする。とりあえず今はIO.Iteratee[T]というのは、「"IOをどういうふうに処理するのかを決めた式"が中に詰まったコンテナ」であるということだけ意識していてくれればよい。

では、実際の動作を見てみる。

まずは新しいクライアントが接続してきたときの部分。

    case IO.NewClient(serverSocket) =>
      val socket = serverSocket.accept()
      state(socket).flatMap(_ => ClientHandler.handleInput(self, socket))

新しいクライアントが接続してきたときに、それをacceptしてるところはまあいいと思う。問題はそのあとの行だ。

まず、state(socket) で、IO.IterateeRef[Unit] を取得している。しかし、このIO.IterateeRef[Unit] はまだできたてほやほやで、なにも参照していない。なので、 flatMap(f: Unit => IO.Iteratee[Unit]) を呼ぶことによって、この state(socket) が参照する IO.Iteratee[Unit] を設定してあげている。このメソッドがflatMapという名前なのがちょっと納得いかない!

今回ならば、ClientHandler.handleInput(self, socket) が返すIO.Itereaetee[Unit]が、state(socket) に紐づけられた形だ。

さて、ここで思い出してほしい。IO.Iteratee[T] は、"「IOがあったときにどういう風にデータをハンドリングするか」という式を内部に持っているコンテナ"であった。ということで、今state(socket)に対して流し込んだデータは、それに紐づけられた IO.Iteratee[Unit] によって処理されることになる。

今回は、その IO.Iteratee は ClientHandler.handleInput() により取得されているものであった。なのでそこを見てみれば、state(socket)に対してなにかデータを流し込んだときにそのデータがどのように処置されるのかがわかるはずだ。なのでそこを見る。

  def handleInput(server: ActorRef, socket: IO.SocketHandle): IO.Iteratee[Unit] = IO repeat {
    readCommand map {
      case ExitCommand() =>
        log.debug("got EXIT command")
        socket.close
 
      case ChatCommand(message) =>
        log.debug("got CHAT command")
        server ! ChatMessage(message)
 
      case UnknownCommand(command) =>
        log.debug("got unknown command: " + command)
        socket.asWritable.write(ByteString("unknown command:" + command + "\r\n"))
    }
  }

IO.repeat(iteratee: IO:Iteratee) は、「"引数に与えられたIO.Itereateeが中に持ってる式を永遠に繰り返すという式"を内部に持った IO.Iteratee[Unit]」 を返す。ではこの中身がどうなっているかを見る。

readCommand というメソッドは、IO.Iteratee[Command] を返している。これの意味は、「"IOの結果をCommand型で返す式" が中に入ってるコンテナ」くらいの意味であった。で、上記コードではreadCommand の結果(IO.Iteratee[Command])に対してmap を呼び出している。

IO.Iteratee[A]#map(f: A => B) の結果は IO.Iteratee[B]となる。日本語で言うなら、someIteratee.map(f)は、「"someIteratee の中に入っている式で取り出せる内容に対してf()を実行するという式"を内部に持つコンテナ」を返すということだ。

では今回の場合どうなっているだろうか。readCommand の結果は Iteratee[Command] なので、それに対するmapの中身はCommandにを引数に取る操作である。今回はパターンマッチで、Commandに応じて様々な処理を行っている。ソケットを閉じるだとか、チャットサーバーアクターにメッセージを送るだとか、定義されていないコマンドの場合は「そんなコマンドしらないよ」ってクライアントに送り返したりだとか。これらはただの「操作」で、返り値はない。つまり Unit である。これで、「"IOの結果をごにょごにょして最終的に Unit になる式"を中に持ったIO.Iteratee[Unit]」の出来上がりだ。

このようにして、IO.IterateeRefに紐づけた「IOの結果に対してどういう操作を行うのかの式」を内部に保持した IO.Iteratee を定義することで、IO.ItereateeRefに対して流し込まれたデータをどのように処理していくのかを定義していく。

では、「"IOの結果をCommandで返す式"を中にもったコンテナ」を返す部分、readCommand メソッドを見てみる。 IO:Iteratee[Command]を返すメソッドである。中を見よう。

  private def readCommand: IO.Iteratee[Command] = {
    for {
      line <- IO.takeUntil(ByteString("\r\n"))
      messages = line.decodeString("US-ASCII").split(" ")
      command = messages.head
      args = messages.tail
    } yield command match {
      case "CHAT" => ChatCommand(args.lift(0).getOrElse(""))
      case "EXIT" => ExitCommand()
      case _ => UnknownCommand(command)
    }
  }

for は慣れないとわかりにくいので、別の書き方にする。

  private def readCommand: IO.Iteratee[Command] = {
    val lineIteratee = IO.takeUntil(ByteString("\r\n"))
    val commandItereatee = lineIteratee.map { line =>
      val messages = line.decodeString("US-ASCII").split(" ")
      val command = messages.head
      val args = messages.tail
      
      command match {
        case "CHAT" => ChatCommand(args.lift(0).getOrElse(""))
        case "EXIT" => ExitCommand()
        case _ => UnknownCommand(command)
      }
    }

    commandItereatee
  }

まず、IO.takeUntil(ByteString) が IO.Iteratee[ByteString] を返す。これ結構重要ポイントで、IO.takeUntil ってまるで「IO から ByteString を取ってくる」みたいな感じがするけど、そうじゃなくて、「IOからByteStringを取ってくる式を中に持ったコンテナ」を返している。

そして、そのコンテナに対して map している。"「lineIterateeの中の式の結果得られるByteStringに対する操作を表した式」を中に持つコンテナ"を作っているわけだ。mapの中身はかんたんで、行をスペースで分けて、その最初の文字列によって様々なCommandを作成している。これで、"「IOからCommandを作るための式」を中にもったコンテナ" が完成。あとはこれを返してあげれば良い。

さて、これで、state(socket)に対してデータが流し込まれたときに、処理をする式が入った、「"IOをどうやって処理するのかの式"を中にもったコンテナ」を一通り作り上げることができた。

ではstate(socket)にデータが長し込まれる部分を見てみよう。

    case IO.Read(socket: IO.SocketHandle, bytes) =>
      state(socket)(IO Chunk bytes)

クライアントからデータが送られてくると、チャットサーバーアクターにIO.Readメッセージが送られてくる。そもメッセージからsocketとbytesを取り出して、state(socket) で、 IO.IterateeRef[Unit] を取得している。それに対して apply(IO Chunk bytes) することでクライアントから送られてきたデータを流し込んでいるのが見て取れると思う。

あとは、このデータをstate(socket)が参照しているIO.Iteratee[Unit]の中にはいった式がぐんぐんと処理してくれるのを待つばかりだ。

という感じでひととおりだろうか。以上をよんだ上で再度コードを全部よんでみると、意味がわかると思う。

はー長かった寝る!!!!!!!推敲とかしてないから明日また推敲するかも。