CLOVER🍀

That was when it all began.

NettyのPipelineとChannelHandlerの関係を学ぶ

久々にNettyです。以前はUser Guideの写経でしたが、今度はもうちょっと踏み込んだ内容に手を出していこうと思います。

まずは、Pipelineとそれに組み込まれているChannelHandlerを理解していくところから。

ChannelHandlerインターフェースは、そのサブクラスとしてChannelDownstreamHandlerインターフェースとChannelUpstreamHandlerインターフェースが存在します。それぞれ、どう使われるのか?

それを理解するには、ChannelPipelineインターフェースのAPIドキュメントにかかれた図が役に立ちます。
http://netty.io/docs/stable/api/org/jboss/netty/channel/ChannelPipeline.html

これと、以前参考にさせていただいたサイトの内容を合わせると、

  • 送信はDownstream
  • 受信はUpstream

ということになります。つまり、

  • クライアントの場合はDownstream→Upstream
  • サーバの場合はUpstream→Downstream

の順番で発生したイベントが伝播していきます。ここで言っているイベントと、各Pipeline内に組み込まれたChannelHandlerがどう利用されているのかを確認するために、例題のTelnetサーバをちょっとカスタマイズしてみました。

まずは、メインクラス。サンプルをScalaで写経したものにすぎません。
TelnetServer.scala

import java.net.InetSocketAddress
import java.util.concurrent.Executors

import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory

object TelnetServer {
  def main(args: Array[String]): Unit = {
    val port = args size match {
      case 0 => 8080
      case _ => args(0).toInt
    }

    new TelnetServer(port).run()
  }
}

class TelnetServer(port: Int) {
  def run(): Unit = {
    val bootstrap = new ServerBootstrap(
                                        new NioServerSocketChannelFactory(
                                          Executors.newCachedThreadPool,
                                          Executors.newCachedThreadPool
                                        ))
    bootstrap.setPipelineFactory(new TelnetServerPipelineFactory)
    bootstrap.bind(new InetSocketAddress(port))
  }
}

続いて、PipelineFactoryです。基本はサンプルと同じですが、Codecを自作のものに置き換えています。
TelnetServerPipelineFactory.scala

import org.jboss.netty.channel.{ChannelPipeline, ChannelPipelineFactory, Channels}
import org.jboss.netty.handler.codec.frame.Delimiters

class TelnetServerPipelineFactory extends ChannelPipelineFactory {
  @throws(classOf[Exception])
  def getPipeline: ChannelPipeline = {
    val pipeline = Channels.pipeline

    pipeline.addLast("framer", new MyDelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter))
    pipeline.addLast("decoder", new MyStringDecoder)
    pipeline.addLast("encoder", new MyStringEncoder)

    pipeline.addLast("handler", new TelnetServerHandler)

    pipeline
  }
}

自作のCodecは、オリジナルのCodecを継承した上で、ちょっとしたトレイトをMix-inしているだけです。

// MyDelimiterBasedFrameDecoder.scala
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder

class MyDelimiterBasedFrameDecoder(maxFrameLength: Int, delimiters: Array[ChannelBuffer])
  extends DelimiterBasedFrameDecoder(maxFrameLength, delimiters: _*) with UpstreamHandlerLogger {

  @throws(classOf[Exception])
  override protected def decode(ctx: ChannelHandlerContext, channel: Channel, channelBuffer: ChannelBuffer): AnyRef = {
    called("decode")
    super.decode(ctx, channel, channelBuffer)
  }
}

// MyStringDecoder.scala
import java.nio.charset.Charset

import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.channel.ChannelHandler.Sharable
import org.jboss.netty.handler.codec.string.StringDecoder

@Sharable
class MyStringDecoder(charset: Charset) extends StringDecoder(charset) with UpstreamHandlerLogger {
  def this() = this(Charset.defaultCharset)

  @throws(classOf[Exception])
  override protected def decode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
    called("decode")
    super.decode(ctx, channel, msg)
  }
}

// MyStringEncoder.scala
import java.nio.charset.Charset

import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.channel.{Channel, ChannelHandlerContext}
import org.jboss.netty.channel.ChannelHandler.Sharable
import org.jboss.netty.handler.codec.string.StringEncoder

@Sharable
class MyStringEncoder(charset: Charset) extends StringEncoder(charset) with DownstreamHandlerLogger {
  def this() = this(Charset.defaultCharset)

  @throws(classOf[Exception])
  override protected def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
    called("encode")
    super.encode(ctx, channel, msg)
  }
}

続いて、ServerHandler。
TelnetServerHandler.scala

import scala.collection.JavaConverters._

import java.net.InetAddress
import java.util.Date

import org.jboss.netty.channel.ChannelEvent
import org.jboss.netty.channel.ChannelFuture
import org.jboss.netty.channel.ChannelFutureListener
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.MessageEvent
import org.jboss.netty.channel.SimpleChannelUpstreamHandler

class TelnetServerHandler extends SimpleChannelUpstreamHandler with UpstreamHandlerLogger {
  @throws(classOf[Exception])
  override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = {
    /*
    if (e.isInstanceOf[ChannelStateEvent]) {
      info("     ChannelStateEvent = " + e.toString)
    }
    */

    super.handleUpstream(ctx, e)
  }

  override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
    e.getChannel.write("Welcome to %s !\r\n".format(InetAddress.getLocalHost.getHostName))
    e.getChannel.write("It is %s now.\r\n".format(new Date))
  }

  override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = e getMessage match {
    case request: String =>
      info("     received message[%s]".format(request))

      val pipeline = ctx.getChannel.getPipeline
      info(pipeline.getNames.asScala.mkString("     current pipelines[", ", ", "]"))

      val (response, close) =
          request match {
            case "" => ("Please type something.\r\n", false)
            case _ if request.toLowerCase == "bye" => ("Have a good day!.\r\n", true)
            case _ => ("Did you say '%s'?\r\n".format(request), false)
          }

      val future = e.getChannel.write(response)

      if (close) {
        future.addListener(ChannelFutureListener.CLOSE)
      }
  }

  override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
    warnLog("Unexcepted exception from downstream.", e.getCause)
    e.getChannel.close()
  }
}

んで、各ChannelHandler(Codec含む)がMix-inしているトレイト。
Logger.scala

import org.jboss.netty.channel.ChannelDownstreamHandler
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelEvent
import org.jboss.netty.channel.ChannelStateEvent
import org.jboss.netty.channel.ChannelUpstreamHandler
import org.jboss.netty.channel.ChildChannelStateEvent
import org.jboss.netty.channel.DownstreamMessageEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.UpstreamMessageEvent
import org.jboss.netty.channel.WriteCompletionEvent
import org.jboss.netty.handler.timeout.IdleStateEvent

trait Logger {
  info("クラス[%s]のインスタンスを作成しました".format(getClass.getName))

  protected def info(msg: String): Unit = println(msg)
  protected def warnLog(msg: String, thrown: Throwable): Unit =
    println("WARN:%s %s".format(msg, thrown))

  protected def called(methodName: String, params: AnyRef*): Unit = params size match {
    case 0 => info("%s#%s called.".format(getClass.getName, methodName))
    case n => info("%s#%s, [%s] called".format(getClass.getName, methodName, params.mkString(",")))
  }
}

trait HandlerLoggerSupport {
  def eventName(e: ChannelEvent): String = e match {
    case event: ChannelStateEvent => event.getState.name
    case event: DownstreamMessageEvent => "RECEIVED"
    case event: ExceptionEvent => "EXCEPTION"
    case event: ChildChannelStateEvent =>
      if (event.getChildChannel.isOpen) "CHILD_OPEN"
      else "CHILD_CLOSED"
    case event: IdleStateEvent => event.getState.name
    case event: UpstreamMessageEvent => "WRITE"
    case event: WriteCompletionEvent => "WRITTEN_AMOUNT"
  }
}

trait UpstreamHandlerLogger extends Logger with HandlerLoggerSupport with ChannelUpstreamHandler {
  abstract override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = {
    called("hanleUpstream", "eventName = " + eventName(e))
    super.handleUpstream(ctx, e)
  }
}

trait DownstreamHandlerLogger extends Logger with HandlerLoggerSupport with ChannelDownstreamHandler {
  abstract override def handleDownstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = {
    called("handleDownstream", "eventName = " + eventName(e))
    super.handleDownstream(ctx, e)
  }
}

trait HandlerLogger extends DownstreamHandlerLogger with UpstreamHandlerLogger

トレイトにChannelUpstreamHandlerインターフェースまたはChannelDownstreamHandlerインターフェースをMix-inした上で、各インターフェースの抽象メソッドをabstract defでオーバーライドします。ChannelUpstreamHandlerの場合は、こんな感じ。

  abstract override def handleUpstream(ctx: ChannelHandlerContext, e: ChannelEvent): Unit = {
    called("hanleUpstream", "eventName = " + eventName(e))
    super.handleUpstream(ctx, e)
  }

メソッドeventNameは、ChannelEventの内容に応じて、イベントを表す文字列を返すメソッドです。実装は、この部分。

  def eventName(e: ChannelEvent): String = e match {
    case event: ChannelStateEvent => event.getState.name
    case event: DownstreamMessageEvent => "RECEIVED"
    case event: ExceptionEvent => "EXCEPTION"
    case event: ChildChannelStateEvent =>
      if (event.getChildChannel.isOpen) "CHILD_OPEN"
      else "CHILD_CLOSED"
    case event: IdleStateEvent => event.getState.name
    case event: UpstreamMessageEvent => "WRITE"
    case event: WriteCompletionEvent => "WRITTEN_AMOUNT"
  }

これで、hadleUpstreamメソッドとhandleDownstreamメソッドに伝播しているイベントをトレースしよう、という目論見です。

ではでは、Telnetサーバを起動してみます。

$ sbt run
[info] Set current project to netty-custom-telnet (in build file:/xxxxx/netty-custom-telnet/)
[info] Running TelnetServer 

このサーバに、telnetコマンドでアクセスしてみます。

$ telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome to ubuntu !
It is Sun Mar 25 20:28:33 JST 2012 now.

すると、サーバ側のコンソールには、こんなログが出ます。

クラス[MyDelimiterBasedFrameDecoder]のインスタンスを作成しました
クラス[MyStringDecoder]のインスタンスを作成しました
クラス[MyStringEncoder]のインスタンスを作成しました
クラス[TelnetServerHandler]のインスタンスを作成しました
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = OPEN] called
MyStringDecoder#hanleUpstream, [eventName = OPEN] called
TelnetServerHandler#hanleUpstream, [eventName = OPEN] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = BOUND] called
MyStringDecoder#hanleUpstream, [eventName = BOUND] called
TelnetServerHandler#hanleUpstream, [eventName = BOUND] called
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = CONNECTED] called
MyStringDecoder#hanleUpstream, [eventName = CONNECTED] called
TelnetServerHandler#hanleUpstream, [eventName = CONNECTED] called
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

インスタンス生成のログが出ているのは、Handlerが作成されるタイミングを見たいためです。なお、encodeメソッドについては自分でログ出力をしています…。

では、クライアント側からメッセージを送ってみます。

Hello World
Did you say 'Hello World'?

この時、サーバ側にはこんなログが出ます。

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyDelimiterBasedFrameDecoder#decode called.
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#decode called.
TelnetServerHandler#hanleUpstream, [eventName = WRITE] called
     received message[Hello World]
     current pipelines[framer, decoder, encoder, handler]
MyStringEncoder#handleDownstream, [eventName = RECEIVED] called
MyStringEncoder#encode called.
MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
MyStringDecoder#hanleUpstream, [eventName = WRITTEN_AMOUNT] called
TelnetServerHandler#hanleUpstream, [eventName = WRITTEN_AMOUNT] called

なんとなく、同じイベントがPipeline内のHandlerに伝播していっているのがわかりますね。

細かく見ていくと、例えばCONNECTEDイベントの時は

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = CONNECTED] called
MyStringDecoder#hanleUpstream, [eventName = CONNECTED] called
TelnetServerHandler#hanleUpstream, [eventName = CONNECTED] called

といった感じですが、RECEIVEDイベントの時は

MyStringEncoder#handleDownstream, [eventName = RECEIVED] called

とChannelDownstreamHandlerしか表示されていませんし、WRITEイベントの時は

MyDelimiterBasedFrameDecoder#hanleUpstream, [eventName = WRITE] called
MyStringDecoder#hanleUpstream, [eventName = WRITE] called
TelnetServerHandler#hanleUpstream, [eventName = WRITE] called

ChannelUpstreamHandlerしか表示されていません。

Pipeline中の、Upstream系のイベントなりDownstream系のイベントに関係のあるChannelHandlerしか呼ばれない、ということでしょうね。SimpleChannelHandlerクラスみたいなChannelUpstreamHandlerでもChannelDownstreamHandlerでもあるようなクラスは、場合によっては両方のイベントに顔を出すこともあるのでしょう。

各ChannelHandlerの実行順ですが、Upstreamの場合はPipelineの前から実行されていきますが、Downstreamの場合はPipelineの後ろから実行されていくようです。ただ、今回はサーバ側で確認しただけなので、後でクライアント側の動作も見ておこうと思います。
→クライアント側も確認しました。

なお、このサーバには「bye」と打つと、接続を切られます。

bye
Have a good day!.
Connection closed by foreign host.

ところで、接続後の各種操作を行っても、「インスタンスを生成しました」というログは接続時の1度しか表示されませんでした。ということは、Handlerのインスタンスは1つしか生成されていないわけですが、ず〜っと1つかというとそうでもないようです。それは、次回に。