ä¹ ã ã«Nettyã§ãã以åã¯User Guideã®åçµã§ããããä»åº¦ã¯ããã¡ãã£ã¨è¸ã¿è¾¼ãã å 容ã«æãåºãã¦ãããã¨æãã¾ãã
ã¾ãã¯ãPipelineã¨ããã«çµã¿è¾¼ã¾ãã¦ããChannelHandlerãç解ãã¦ããã¨ããããã
ChannelHandlerã¤ã³ã¿ã¼ãã§ã¼ã¹ã¯ããã®ãµãã¯ã©ã¹ã¨ãã¦ChannelDownstreamHandlerã¤ã³ã¿ã¼ãã§ã¼ã¹ã¨ChannelUpstreamHandlerã¤ã³ã¿ã¼ãã§ã¼ã¹ãåå¨ãã¾ãããããããã©ã使ãããã®ãï¼
ãããç解ããã«ã¯ãChannelPipelineã¤ã³ã¿ã¼ãã§ã¼ã¹ã®APIããã¥ã¡ã³ãã«ããããå³ãå½¹ã«ç«ã¡ã¾ãã
http://netty.io/docs/stable/api/org/jboss/netty/channel/ChannelPipeline.html
ããã¨ã以ååèã«ããã¦ããã ãããµã¤ãã®å 容ãåãããã¨ã
- éä¿¡ã¯Downstream
- åä¿¡ã¯Upstream
ã¨ãããã¨ã«ãªãã¾ããã¤ã¾ãã
- ã¯ã©ã¤ã¢ã³ãã®å ´åã¯DownstreamâUpstream
- ãµã¼ãã®å ´åã¯UpstreamâDownstream
ã®é çªã§çºçããã¤ãã³ããä¼æãã¦ããã¾ããããã§è¨ã£ã¦ããã¤ãã³ãã¨ãåPipelineå ã«çµã¿è¾¼ã¾ããChannelHandlerãã©ãå©ç¨ããã¦ããã®ãã確èªããããã«ãä¾é¡ã®Telnetãµã¼ããã¡ãã£ã¨ã«ã¹ã¿ãã¤ãºãã¦ã¿ã¾ããã
ã¾ãã¯ãã¡ã¤ã³ã¯ã©ã¹ããµã³ãã«ãScalaã§åçµãããã®ã«ããã¾ããã
TelnetServer.scala
import java.net.InetSocketAddress import java.util.concurrent.Executors import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory object TelnetServer { def main(args: Array[String]): Unit = { val port = args size match { case 0 => 8080 case _ => args(0).toInt } new TelnetServer(port).run() } } class TelnetServer(port: Int) { def run(): Unit = { val bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool, Executors.newCachedThreadPool )) bootstrap.setPipelineFactory(new TelnetServerPipelineFactory) bootstrap.bind(new InetSocketAddress(port)) } }
ç¶ãã¦ãPipelineFactoryã§ããåºæ¬ã¯ãµã³ãã«ã¨åãã§ãããCodecãèªä½ã®ãã®ã«ç½®ãæãã¦ãã¾ãã
TelnetServerPipelineFactory.scala
import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels} import org.jboss.netty.handler.codec.frame.Delimiters class TelnetServerPipelineFactory extends ChannelPipelineFactory { @throws(classOf[Exception]) def getPipeline: ChannelPipeline = { val pipeline = Channels.pipeline pipeline.addLast("framer", new MyDelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter)) pipeline.addLast("decoder", new MyStringDecoder) pipeline.addLast("encoder", new MyStringEncoder) pipeline.addLast("handler", new TelnetServerHandler) pipeline } }
èªä½ã®Codecã¯ããªãªã¸ãã«ã®Codecãç¶æ¿ããä¸ã§ãã¡ãã£ã¨ãããã¬ã¤ããMix-inãã¦ããã ãã§ãã
// MyDelimiterBasedFrameDecoder.scala import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.channel.{Channel, ChannelHandlerContext} import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder class MyDelimiterBasedFrameDecoder(maxFrameLength: Int, delimiters: Array[ChannelBuffer]) extends DelimiterBasedFrameDecoder(maxFrameLength, delimiters: _*) with UpstreamHandlerLogger { @throws(classOf[Exception]) override protected def decode(ctx: ChannelHandlerContext, channel: Channel, channelBuffer: ChannelBuffer): AnyRef = { called("decode") super.decode(ctx, channel, channelBuffer) } } // MyStringDecoder.scala import java.nio.charset.Charset import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.channel.{Channel, ChannelHandlerContext} import org.jboss.netty.channel.ChannelHandler.Sharable import org.jboss.netty.handler.codec.string.StringDecoder @Sharable class MyStringDecoder(charset: Charset) extends StringDecoder(charset) with UpstreamHandlerLogger { def this() = this(Charset.defaultCharset) @throws(classOf[Exception]) override protected def decode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = { called("decode") super.decode(ctx, channel, msg) } } // MyStringEncoder.scala import java.nio.charset.Charset import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.channel.{Channel, ChannelHandlerContext} import org.jboss.netty.channel.ChannelHandler.Sharable import org.jboss.netty.handler.codec.string.StringEncoder @Sharable class MyStringEncoder(charset: Charset) extends StringEncoder(charset) with DownstreamHandlerLogger { def this() = this(Charset.defaultCharset) @throws(classOf[Exception]) override protected def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = { called("encode") super.encode(ctx, channel, msg) } }
ç¶ãã¦ãServerHandlerã
TelnetServerHandler.scala
import scala.collection.JavaConverters._ import java.net.InetAddress import java.util.Date import org.jboss.netty.channel.ChannelEvent import org.jboss.netty.channel.ChannelFuture import org.jboss.netty.channel.ChannelFutureListener import org.jboss.netty.channel.ChannelHandlerContext import org.jboss.netty.channel.ChannelStateEvent import org.jboss.netty.channel.ExceptionEvent import org.jboss.netty.channel.MessageEvent import org.jboss.netty.channel.SimpleChannelUpstreamHandler class TelnetServerHandler extends SimpleChannelUpstreamHandler with UpstreamHandlerLogger { @throws(classOf[Exception]) override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = { /* if (e.isInstanceOf[ChannelStateEvent]) { info(" ChannelStateEvent = " + e.toString) } */ super.handleUpstream(ctx, e) } override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { e.getChannel.write("Welcome to %s !\r\n".format(InetAddress.getLocalHost.getHostName)) e.getChannel.write("It is %s now.\r\n".format(new Date)) } override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = e getMessage match { case request: String => info(" received message[%s]".format(request)) val pipeline = ctx.getChannel.getPipeline info(pipeline.getNames.asScala.mkString(" current pipelines[", ", ", "]")) val (response, close) = request match { case "" => ("Please type something.\r\n", false) case _ if request.toLowerCase == "bye" => ("Have a good day!.\r\n", true) case _ => ("Did you say '%s'?\r\n".format(request), false) } val future = e.getChannel.write(response) if (close) { future.addListener(ChannelFutureListener.CLOSE) } } override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = { warnLog("Unexcepted exception from downstream.", e.getCause) e.getChannel.close() } }
ãã§ãåChannelHandlerï¼Codecå«ãï¼ãMix-inãã¦ãããã¬ã¤ãã
Logger.scala
import org.jboss.netty.channel.ChannelDownstreamHandler import org.jboss.netty.channel.ChannelHandlerContext import org.jboss.netty.channel.ChannelEvent import org.jboss.netty.channel.ChannelStateEvent import org.jboss.netty.channel.ChannelUpstreamHandler import org.jboss.netty.channel.ChildChannelStateEvent import org.jboss.netty.channel.DownstreamMessageEvent import org.jboss.netty.channel.ExceptionEvent import org.jboss.netty.channel.UpstreamMessageEvent import org.jboss.netty.channel.WriteCompletionEvent import org.jboss.netty.handler.timeout.IdleStateEvent trait Logger { info("ã¯ã©ã¹[%s]ã®ã¤ã³ã¹ã¿ã³ã¹ãä½æãã¾ãã".format(getClass.getName)) protected def info(msg: String): Unit = println(msg) protected def warnLog(msg: String, thrown: Throwable): Unit = println("WARN:%s %s".format(msg, thrown)) protected def called(methodName: String, params: AnyRef*): Unit = params size match { case 0 => info("%s#%s called.".format(getClass.getName, methodName)) case n => info("%s#%s, [%s] called".format(getClass.getName, methodName, params.mkString(","))) } } trait HandlerLoggerSupport { def eventName(e: ChannelEvent): String = e match { case event: ChannelStateEvent => event.getState.name case event: DownstreamMessageEvent => "RECEIVED" case event: ExceptionEvent => "EXCEPTION" case event: ChildChannelStateEvent => if (event.getChildChannel.isOpen) "CHILD_OPEN" else "CHILD_CLOSED" case event: IdleStateEvent => event.getState.name case event: UpstreamMessageEvent => "WRITE" case event: WriteCompletionEvent => "WRITTEN_AMOUNT" } } trait UpstreamHandlerLogger extends Logger with HandlerLoggerSupport with ChannelUpstreamHandler { abstract override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = { called("hanleUpstream", "eventName = " + eventName(e)) super.handleUpstream(ctx, e) } } trait DownstreamHandlerLogger extends Logger with HandlerLoggerSupport with ChannelDownstreamHandler { abstract override def handleDownstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = { called("handleDownstream", "eventName = " + eventName(e)) super.handleDownstream(ctx, e) } } trait HandlerLogger extends DownstreamHandlerLogger with UpstreamHandlerLogger
ãã¬ã¤ãã«ChannelUpstreamHandlerã¤ã³ã¿ã¼ãã§ã¼ã¹ã¾ãã¯ChannelDownstreamHandlerã¤ã³ã¿ã¼ãã§ã¼ã¹ãMix-inããä¸ã§ãåã¤ã³ã¿ã¼ãã§ã¼ã¹ã®æ½è±¡ã¡ã½ãããabstract defã§ãªã¼ãã¼ã©ã¤ããã¾ããChannelUpstreamHandlerã®å ´åã¯ããããªæãã
abstract override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = { called("hanleUpstream", "eventName = " + eventName(e)) super.handleUpstream(ctx, e) }
ã¡ã½ããeventNameã¯ãChannelEventã®å 容ã«å¿ãã¦ãã¤ãã³ãã表ãæååãè¿ãã¡ã½ããã§ããå®è£ ã¯ããã®é¨åã
def eventName(e: ChannelEvent): String = e match { case event: ChannelStateEvent => event.getState.name case event: DownstreamMessageEvent => "RECEIVED" case event: ExceptionEvent => "EXCEPTION" case event: ChildChannelStateEvent => if (event.getChildChannel.isOpen) "CHILD_OPEN" else "CHILD_CLOSED" case event: IdleStateEvent => event.getState.name case event: UpstreamMessageEvent => "WRITE" case event: WriteCompletionEvent => "WRITTEN_AMOUNT" }
ããã§ãhadleUpstreamã¡ã½ããã¨handleDownstreamã¡ã½ããã«ä¼æãã¦ããã¤ãã³ãããã¬ã¼ã¹ããããã¨ããç®è«è¦ã§ãã
ã§ã¯ã§ã¯ãTelnetãµã¼ããèµ·åãã¦ã¿ã¾ãã
$ sbt run [info] Set current project to netty-custom-telnet (in build file:/xxxxx/netty-custom-telnet/) [info] Running TelnetServer
ãã®ãµã¼ãã«ãtelnetã³ãã³ãã§ã¢ã¯ã»ã¹ãã¦ã¿ã¾ãã
$ telnet localhost 8080 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. Welcome to ubuntu ! It is Sun Mar 25 20:28:33 JST 2012 now.
ããã¨ããµã¼ãå´ã®ã³ã³ã½ã¼ã«ã«ã¯ããããªãã°ãåºã¾ãã
ã¯ã©ã¹[MyDelimiterBasedFrameDecoder]ã®ã¤ã³ã¹ã¿ã³ã¹ãä½æãã¾ãã ã¯ã©ã¹[MyStringDecoder]ã®ã¤ã³ã¹ã¿ã³ã¹ãä½æãã¾ãã ã¯ã©ã¹[MyStringEncoder]ã®ã¤ã³ã¹ã¿ã³ã¹ãä½æãã¾ãã ã¯ã©ã¹[TelnetServerHandler]ã®ã¤ã³ã¹ã¿ã³ã¹ãä½æãã¾ãã MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = OPEN] called MyStringDecoder#hanleUpstream, [eventName = OPEN] called TelnetServerHandler#hanleUpstream, [eventName = OPEN] called MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = BOUND] called MyStringDecoder#hanleUpstream, [eventName = BOUND] called TelnetServerHandler#hanleUpstream, [eventName = BOUND] called MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = CONNECTED] called MyStringDecoder#hanleUpstream, [eventName = CONNECTED] called TelnetServerHandler#hanleUpstream, [eventName = CONNECTED] called MyStringEncoder#handleDownstream, [eventName = RECEIVED] called MyStringEncoder#encode called. MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called MyStringEncoder#handleDownstream, [eventName = RECEIVED] called MyStringEncoder#encode called. MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
ã¤ã³ã¹ã¿ã³ã¹çæã®ãã°ãåºã¦ããã®ã¯ãHandlerãä½æãããã¿ã¤ãã³ã°ãè¦ããããã§ãããªããencodeã¡ã½ããã«ã¤ãã¦ã¯èªåã§ãã°åºåããã¦ãã¾ãâ¦ã
ã§ã¯ãã¯ã©ã¤ã¢ã³ãå´ããã¡ãã»ã¼ã¸ãéã£ã¦ã¿ã¾ãã
Hello World Did you say 'Hello World'?
ãã®æããµã¼ãå´ã«ã¯ãããªãã°ãåºã¾ãã
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called MyDelimiterBasedFrameDecoder#decode called. MyStringDecoder#hanleUpstream, [eventName = WRITE] called MyStringDecoder#decode called. TelnetServerHandler#hanleUpstream, [eventName = WRITE] called received message[Hello World] current pipelines[framer, decoder, encoder, handler] MyStringEncoder#handleDownstream, [eventName = RECEIVED] called MyStringEncoder#encode called. MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
ãªãã¨ãªããåãã¤ãã³ããPipelineå ã®Handlerã«ä¼æãã¦ãã£ã¦ããã®ããããã¾ããã
ç´°ããè¦ã¦ããã¨ãä¾ãã°CONNECTEDã¤ãã³ãã®æã¯
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = CONNECTED] called MyStringDecoder#hanleUpstream, [eventName = CONNECTED] called TelnetServerHandler#hanleUpstream, [eventName = CONNECTED] called
ã¨ãã£ãæãã§ãããRECEIVEDã¤ãã³ãã®æã¯
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
ã¨ChannelDownstreamHandlerãã表示ããã¦ãã¾ããããWRITEã¤ãã³ãã®æã¯
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called MyStringDecoder#hanleUpstream, [eventName = WRITE] called TelnetServerHandler#hanleUpstream, [eventName = WRITE] called
ChannelUpstreamHandlerãã表示ããã¦ãã¾ããã
Pipelineä¸ã®ãUpstreamç³»ã®ã¤ãã³ããªãDownstreamç³»ã®ã¤ãã³ãã«é¢ä¿ã®ããChannelHandlerããå¼ã°ããªããã¨ãããã¨ã§ãããããSimpleChannelHandlerã¯ã©ã¹ã¿ãããªChannelUpstreamHandlerã§ãChannelDownstreamHandlerã§ããããããªã¯ã©ã¹ã¯ãå ´åã«ãã£ã¦ã¯ä¸¡æ¹ã®ã¤ãã³ãã«é¡ãåºããã¨ãããã®ã§ãããã
åChannelHandlerã®å®è¡é ã§ãããUpstreamã®å ´åã¯Pipelineã®åããå®è¡ããã¦ããã¾ãããDownstreamã®å ´åã¯Pipelineã®å¾ãããå®è¡ããã¦ããããã§ãããã ãä»åã¯ãµã¼ãå´ã§ç¢ºèªããã ããªã®ã§ãå¾ã§ã¯ã©ã¤ã¢ã³ãå´ã®åä½ãè¦ã¦ãããã¨æãã¾ãã
âクライアント側も確認しました。
ãªãããã®ãµã¼ãã«ã¯ãbyeãã¨æã¤ã¨ãæ¥ç¶ãåããã¾ãã
bye Have a good day!. Connection closed by foreign host.
ã¨ããã§ãæ¥ç¶å¾ã®å種æä½ãè¡ã£ã¦ãããã¤ã³ã¹ã¿ã³ã¹ãçæãã¾ãããã¨ãããã°ã¯æ¥ç¶æã®1度ãã表示ããã¾ããã§ãããã¨ãããã¨ã¯ãHandlerã®ã¤ã³ã¹ã¿ã³ã¹ã¯1ã¤ããçæããã¦ããªãããã§ãããããã£ã¨1ã¤ãã¨ããã¨ããã§ããªãããã§ããããã¯ã次åã«ã