ï¼ï¼ï¼CAUTIONï¼ï¼ï¼
ãã®è¨äºã§æ±ã£ã¦ãã IO ã®ã¤ã³ã¿ã¼ãã§ã¤ã¹ã¯ Akka 2.2 㧠ãã§ã« old-io æ±ãã¨ãªã( http://doc.akka.io/docs/akka/2.2.0/scala/io-old.html )ãAkka 2.3 ããã¯åé¤ããã¦ãã¾ã£ã¦ãã¾ãã
ä»ãªãã°ãã¡ããåèã«ãããã»ããããã§ãããã
I/O ã¾ããï¼ http://doc.akka.io/docs/akka/2.3.7/scala/io.html
TCPã®ãã³ããªã³ã°ï¼http://doc.akka.io/docs/akka/2.3.7/scala/io-tcp.html
æ¬æ
akka ã¨ããã®ã¯ Scala 㧠Actor ã¢ãã«ãå®ç¾ããããã®ã©ã¤ãã©ãªã ã¨æãã°ããã£ã½ãããã®è¨äºãèªãããã«ã¯ case class ã¨ãã¿ã¼ã³ãããã¨ã¢ã¯ã¿ã¼ã¢ãã«ã«ã¤ãã¦ã®ç¥èãæä½éããã¨ããã¨æãã
ãã¡ã©å ¨ã½ã¼ã¹ãè²¼ãã
ã¿ã£ã¤ã®ãã¡ã¤ã«ããã§ãã¦ãããã¾ãæå㯠Main ããè¦ã¦è¡ãã¹ãã§ãããã
object Main { def main(args: Array[String]) { val actorSystem = ActorSystem.create actorSystem.actorOf(Props(new ChatServer(9876))) } }
ActorSystem.create
ã§ãã¢ã¯ã¿ã¼ãåãåãããã®ç©ºéãä½ãåºãã¦ãããã§ããã®ç©ºéã®ä¸ã«ããã£ãããµã¼ãã¼ã®å½¹å²ãããã¢ã¯ã¿ã¼ãæ°ããä½ã£ã¦ãããã¡ãªã¿ã«ãã®ã¨ãè¿ãããã®ã¯ActorRef
ã¨ãããã®ã§ãããã¢ã¯ã¿ã¼ãã®ãã®ã§ã¯ãªããã¢ã¯ã¿ã¼ãã®ãã®ã¸ã®ãã¤ã³ã¿ã ã¨æãã°ã»ã¼ééããªãã
ã§ã¯ä»ä½ããã ãã£ãããµã¼ãã¼ã¢ã¯ã¿ã¼ã®å®è£ ãè¦ã
class ChatServer(port: Int) extends Actor { val state = IO.IterateeRef.Map.async[IO.SocketHandle]()(context.dispatcher) override def preStart = IOManager(context.system).listen("localhost", port) def receive = { case ChatMessage(message) => state.keys.foreach(_.asWritable.write(ByteString("message: " + message + "\r\n"))) case IO.NewClient(serverSocket) => val socket = serverSocket.accept() state(socket).flatMap(_ => ClientHandler.handleInput(self, socket)) case IO.Read(socket: IO.SocketHandle, bytes) => state(socket)(IO Chunk bytes) case IO.Closed(socket: IO.SocketHandle, cause) => state(socket)(IO EOF) socket.close state -= socket } }
preStart
ã¡ã½ããã override
ãã¦ãã¢ã¯ã¿ã¼ãåæåãã¦ããããã®ä¸ã§ IOManager
ã使ã£ã¦ãµã¼ãã¼ã½ã±ãããä½ã£ã¦ listen ãã¦ãããIOManager
ã¯ä½ã¬ã¤ã¤ã¼ã® IO ãé è½ãã¦ããã¦ãªããããã楽㫠IO æ¸ãããã¤ãã¢ã¯ã¿ã¼ã®ä¸ã§ IOManager(...).listen
ããã¨ããã®ã½ã±ããã«å¯¾ãã¦ãªã«ãæ¸ããããæ¥ç¶ããã£ããããã¨ãã«ãã®ã¢ã¯ã¿ã¼ã«å¯¾ã㦠IO.*
ã¨ããã¡ãã»ã¼ã¸ãéããã¦ããããã«ãªãããã ãã¡ãªã¿ã«ãIOManager(...).listen(...)(actorRef)
ã¨ããæãã§ãIO.*
ã¡ãã»ã¼ã¸ãåãåãã¢ã¯ã¿ã¼ãæ示çã«æå®ãããã¨ãã§ãããã§ã¯ã¢ã¯ã¿ã¼ã¯ãã®ã¡ãã»ã¼ã¸ãã©ããã£ã¦åãåããã
ãããåãåãããã®ã¡ã½ããã receive
ã¡ã½ããã§ãããå®ä½ã¯ãã¿ã¼ã³ãããã§ãåãåã£ãã¡ãã»ã¼ã¸ã®ç¨®é¡ã«ãã£ã¦ããã§å¦çãåãã¦ãããä»å㯠IO.*
ãåãåã£ãã°ããã®å¦çã«ãstate ã¨ããå¤æ°ãç»å ´ãã¦ãã¦ãããã«ãªã«ããå¦çãè¡ã£ã¦ããã
ã§ã¯ state ã¨ã¯ãªã«ããIO.IterateeRef.Map.async[IO.SocketHandle]()(context.dispatcher)
ã§ãããããã¯IO.socketHandle
ããã¼ã¨ãã¦Unit
ãå¤ã«æ㤠IO.IterateeRef.Map
ã¨ããåã®ã³ã³ããã§ããããããçµæ§ããè
ã§ãå¤ã« Unit
ãæã¤ã®æå³ãã¾ããããã«ããã®ã ããã¨ããããããã§ã¯ããã¼ãIO.socketHandle
ã®ããªãããããã³ã³ããã§ãããã¨ãããã¨ã ãé ã«å
¥ãã¦ããã°ããã
ã¡ãªã¿ã«ãIO.IterateeRef.Map#apply(socket)
㯠socket
ã«ç´ã¥ãããã IO.IteateeRef[Unit]
ãè¿ããæ°ããã®ãåºã¦ããï¼IO.IterateeRef[Unit]
ã£ã¦ãªãã ï¼ï¼ï¼ï¼ï¼ã¨ããæãã ãã©ãã¾ãååã®éã IO.Iteratee[Unit]
ãåç
§ããããã®ãã®ã®ããã ãã¨ã¯ãããå½ç¶ãããªãã¨ãããã IO.Iteratee[Unit]
ã£ã¦ãªãã«ï¼ãã¨ãã話ã«ãªã£ã¦ããã
å
ã«ããããã«ã¤ã¡ã¼ã¸ããã¦ãããã¨ãIO.Iteratee[T]
ã¨ããã®ã¯ãã"ãªã«ã IO ã«å¯¾ãã¦ãã¼ã¿ãæ¸ãè¾¼ã¾ããã¨ãã«ããã®ãã¼ã¿ãã©ããã£ã¦å¦çããã®ããåã決ããå¼"ãå
é¨ã«æã¤ã³ã³ãããã ã¨æã£ã¦ãããã°ã ãããééããªããããã¼ã¿ãã©ããã£ã¦æ±ãã®ãã決ããå¼ããå
é¨ã«æã£ã¦ããã¦ãå®éã«ãã¼ã¿ãèªã¿è¾¼ãã ãããã®ã¯é
延ããããã¨ããæå³ã§ã¯LINQã¨ãã«ä¼¼ã¦ãã¨æããIO.Iteratee[String]
ãªãã°ãã"IOããéããã¦ãããã¼ã¿ãæååã«ãã¦åå¾ããå¼"ãå
é¨ã«æã¤ã³ã³ãããã ããIO.Iteratee[List[String]]
ãªãã°ãã"æååã®ãªã¹ãã«ãã¦åå¾ããå¼"ãå
é¨ã«æã¤ã³ã³ããããã¨ããå
·åã ãããã IO.Iteratee[Unit]
ã£ã¦ã©ããããã¨ã ãâ¦â¦ã£ã¦æãã ãã©ãããã¯å¾ã»ã©è¦ããã¨ã«ãããã¨ããããä»ã¯IO.Iteratee[T]
ã¨ããã®ã¯ãã"IOãã©ããããµãã«å¦çããã®ãã決ããå¼"ãä¸ã«è©°ã¾ã£ãã³ã³ãããã§ããã¨ãããã¨ã ãæèãã¦ãã¦ãããã°ããã
ã§ã¯ãå®éã®åä½ãè¦ã¦ã¿ãã
ã¾ãã¯æ°ããã¯ã©ã¤ã¢ã³ããæ¥ç¶ãã¦ããã¨ãã®é¨åã
case IO.NewClient(serverSocket) => val socket = serverSocket.accept() state(socket).flatMap(_ => ClientHandler.handleInput(self, socket))
æ°ããã¯ã©ã¤ã¢ã³ããæ¥ç¶ãã¦ããã¨ãã«ããããaccept
ãã¦ãã¨ããã¯ã¾ãããã¨æããåé¡ã¯ãã®ãã¨ã®è¡ã ã
ã¾ããstate(socket)
ã§ãIO.IterateeRef[Unit]
ãåå¾ãã¦ããããããããã®IO.IterateeRef[Unit]
ã¯ã¾ã ã§ããã¦ã»ãã»ãã§ããªã«ãåç
§ãã¦ããªãããªã®ã§ã flatMap(f: Unit => IO.Iteratee[Unit])
ãå¼ã¶ãã¨ã«ãã£ã¦ããã® state(socket)
ãåç
§ãã IO.Iteratee[Unit]
ãè¨å®ãã¦ããã¦ããããã®ã¡ã½ãããflatMapã¨ããååãªã®ãã¡ãã£ã¨ç´å¾ãããªãï¼
ä»åãªãã°ãClientHandler.handleInput(self, socket)
ãè¿ãIO.Itereaetee[Unit]
ããstate(socket)
ã«ç´ã¥ããããå½¢ã ã
ãã¦ãããã§æãåºãã¦ã»ãããIO.Iteratee[T]
ã¯ã"ãIOããã£ãã¨ãã«ã©ããã風ã«ãã¼ã¿ããã³ããªã³ã°ããããã¨ããå¼ãå
é¨ã«æã£ã¦ããã³ã³ãã"ã§ãã£ããã¨ãããã¨ã§ãä»state(socket)
ã«å¯¾ãã¦æµãè¾¼ãã ãã¼ã¿ã¯ãããã«ç´ã¥ãããã IO.Iteratee[Unit]
ã«ãã£ã¦å¦çããããã¨ã«ãªãã
ä»åã¯ããã® IO.Iteratee
㯠ClientHandler.handleInput()
ã«ããåå¾ããã¦ãããã®ã§ãã£ãããªã®ã§ãããè¦ã¦ã¿ãã°ãstate(socket)
ã«å¯¾ãã¦ãªã«ããã¼ã¿ãæµãè¾¼ãã ã¨ãã«ãã®ãã¼ã¿ãã©ã®ããã«å¦ç½®ãããã®ãããããã¯ãã ããªã®ã§ãããè¦ãã
def handleInput(server: ActorRef, socket: IO.SocketHandle): IO.Iteratee[Unit] = IO repeat { readCommand map { case ExitCommand() => log.debug("got EXIT command") socket.close case ChatCommand(message) => log.debug("got CHAT command") server ! ChatMessage(message) case UnknownCommand(command) => log.debug("got unknown command: " + command) socket.asWritable.write(ByteString("unknown command:" + command + "\r\n")) } }
IO.repeat(iteratee: IO:Iteratee)
ã¯ãã"å¼æ°ã«ä¸ããããIO.Itereatee
ãä¸ã«æã£ã¦ãå¼ãæ°¸é ã«ç¹°ãè¿ãã¨ããå¼"ãå
é¨ã«æã£ã IO.Iteratee[Unit]
ã ãè¿ããã§ã¯ãã®ä¸èº«ãã©ããªã£ã¦ããããè¦ãã
readCommand
ã¨ããã¡ã½ããã¯ãIO.Iteratee[Command]
ãè¿ãã¦ãããããã®æå³ã¯ãã"IOã®çµæãCommandåã§è¿ãå¼" ãä¸ã«å
¥ã£ã¦ãã³ã³ããããããã®æå³ã§ãã£ããã§ãä¸è¨ã³ã¼ãã§ã¯readCommand
ã®çµæ(IO.Iteratee[Command]
)ã«å¯¾ãã¦map ãå¼ã³åºãã¦ããã
IO.Iteratee[A]#map(f: A => B)
ã®çµæ㯠IO.Iteratee[B]
ã¨ãªããæ¥æ¬èªã§è¨ããªããsomeIteratee.map(f)
ã¯ãã"someIteratee
ã®ä¸ã«å
¥ã£ã¦ããå¼ã§åãåºããå
容ã«å¯¾ãã¦f()ãå®è¡ããã¨ããå¼"ãå
é¨ã«æã¤ã³ã³ããããè¿ãã¨ãããã¨ã ã
ã§ã¯ä»åã®å ´åã©ããªã£ã¦ããã ããããreadCommand
ã®çµæ㯠Iteratee[Command]
ãªã®ã§ãããã«å¯¾ããmap
ã®ä¸èº«ã¯Command
ã«ãå¼æ°ã«åãæä½ã§ãããä»åã¯ãã¿ã¼ã³ãããã§ãCommandã«å¿ãã¦æ§ã
ãªå¦çãè¡ã£ã¦ãããã½ã±ãããéããã ã¨ãããã£ãããµã¼ãã¼ã¢ã¯ã¿ã¼ã«ã¡ãã»ã¼ã¸ãéãã ã¨ããå®ç¾©ããã¦ããªãã³ãã³ãã®å ´åã¯ããããªã³ãã³ããããªãããã£ã¦ã¯ã©ã¤ã¢ã³ãã«éãè¿ãããã ã¨ãããããã¯ãã ã®ãæä½ãã§ãè¿ãå¤ã¯ãªããã¤ã¾ã Unit ã§ãããããã§ãã"IOã®çµæããã«ããã«ããã¦æçµçã« Unit ã«ãªãå¼"ãä¸ã«æã£ãIO.Iteratee[Unit]
ãã®åºæ¥ä¸ããã ã
ãã®ããã«ãã¦ãIO.IterateeRef
ã«ç´ã¥ãããIOã®çµæã«å¯¾ãã¦ã©ãããæä½ãè¡ãã®ãã®å¼ããå
é¨ã«ä¿æãã IO.Iteratee
ãå®ç¾©ãããã¨ã§ãIO.ItereateeRef
ã«å¯¾ãã¦æµãè¾¼ã¾ãããã¼ã¿ãã©ã®ããã«å¦çãã¦ããã®ããå®ç¾©ãã¦ããã
ã§ã¯ãã"IOã®çµæãCommandã§è¿ãå¼"ãä¸ã«ãã£ãã³ã³ããããè¿ãé¨åãreadCommand
ã¡ã½ãããè¦ã¦ã¿ãã IO:Iteratee[Command]
ãè¿ãã¡ã½ããã§ãããä¸ãè¦ããã
private def readCommand: IO.Iteratee[Command] = { for { line <- IO.takeUntil(ByteString("\r\n")) messages = line.decodeString("US-ASCII").split(" ") command = messages.head args = messages.tail } yield command match { case "CHAT" => ChatCommand(args.lift(0).getOrElse("")) case "EXIT" => ExitCommand() case _ => UnknownCommand(command) } }
for ã¯æ £ããªãã¨ãããã«ããã®ã§ãå¥ã®æ¸ãæ¹ã«ããã
private def readCommand: IO.Iteratee[Command] = { val lineIteratee = IO.takeUntil(ByteString("\r\n")) val commandItereatee = lineIteratee.map { line => val messages = line.decodeString("US-ASCII").split(" ") val command = messages.head val args = messages.tail command match { case "CHAT" => ChatCommand(args.lift(0).getOrElse("")) case "EXIT" => ExitCommand() case _ => UnknownCommand(command) } } commandItereatee }
ã¾ããIO.takeUntil(ByteString)
ã IO.Iteratee[ByteString]
ãè¿ããããçµæ§éè¦ãã¤ã³ãã§ãIO.takeUntil ã£ã¦ã¾ãã§ãIO ãã ByteString ãåã£ã¦ãããã¿ãããªæãããããã©ããããããªãã¦ããIOããByteStringãåã£ã¦ããå¼ãä¸ã«æã£ãã³ã³ããããè¿ãã¦ããã
ããã¦ããã®ã³ã³ããã«å¯¾ã㦠map ãã¦ããã"ãlineIterateeã®ä¸ã®å¼ã®çµæå¾ãããByteStringã«å¯¾ããæä½ã表ããå¼ããä¸ã«æã¤ã³ã³ãã"ãä½ã£ã¦ããããã ãmapã®ä¸èº«ã¯ããããã§ãè¡ãã¹ãã¼ã¹ã§åãã¦ããã®æåã®æååã«ãã£ã¦æ§ã ãªCommandãä½æãã¦ãããããã§ã"ãIOããCommandãä½ãããã®å¼ããä¸ã«ãã£ãã³ã³ãã" ãå®æããã¨ã¯ãããè¿ãã¦ãããã°è¯ãã
ãã¦ãããã§ãstate(socket)
ã«å¯¾ãã¦ãã¼ã¿ãæµãè¾¼ã¾ããã¨ãã«ãå¦çãããå¼ãå
¥ã£ããã"IOãã©ããã£ã¦å¦çããã®ãã®å¼"ãä¸ã«ãã£ãã³ã³ããããä¸éãä½ãä¸ãããã¨ãã§ããã
ã§ã¯state(socket)
ã«ãã¼ã¿ãé·ãè¾¼ã¾ããé¨åãè¦ã¦ã¿ããã
case IO.Read(socket: IO.SocketHandle, bytes) =>
state(socket)(IO Chunk bytes)
ã¯ã©ã¤ã¢ã³ããããã¼ã¿ãéããã¦ããã¨ããã£ãããµã¼ãã¼ã¢ã¯ã¿ã¼ã«IO.Read
ã¡ãã»ã¼ã¸ãéããã¦ãããããã¡ãã»ã¼ã¸ããsocket
ã¨bytes
ãåãåºãã¦ãstate(socket)
ã§ã IO.IterateeRef[Unit]
ãåå¾ãã¦ãããããã«å¯¾ã㦠apply(IO Chunk bytes)
ãããã¨ã§ã¯ã©ã¤ã¢ã³ãããéããã¦ãããã¼ã¿ãæµãè¾¼ãã§ããã®ãè¦ã¦åããã¨æãã
ãã¨ã¯ããã®ãã¼ã¿ãstate(socket)
ãåç
§ãã¦ããIO.Iteratee[Unit]
ã®ä¸ã«ã¯ãã£ãå¼ãããããã¨å¦çãã¦ãããã®ãå¾
ã¤ã°ããã ã
ã¨ããæãã§ã²ã¨ã¨ããã ãããã以ä¸ãããã ä¸ã§å度ã³ã¼ããå ¨é¨ããã§ã¿ãã¨ãæå³ããããã¨æãã
ã¯ã¼é·ãã£ãå¯ãï¼ï¼ï¼ï¼ï¼ï¼ï¼æ¨æ²ã¨ããã¦ãªãããææ¥ã¾ãæ¨æ²ããããã