Scala で簡易 Web サーバのプロトタイプをリファクタしてみた(2)

前回
blog1.mammb.com
の続き。

エントリポイント Main.scala

package etc9.aokan
object Main {
  def main(args: Array[String]):Unit = new Acceptor(8900,
    new IoHandler[HttpRequest, HttpResponse]
      with HttpReadChannel with HttpProcess with HttpWriteChannel)
}

NIO系の処理を行う IoAcceptor.scala

package etc9.aokan
import java.nio.channels.{ServerSocketChannel, SocketChannel, SelectionKey, Selector}

class Acceptor(port: Int, ioCallback: => Handler) {
  
  using(Selector.open) { selector =>
    using(ServerSocketChannel.open) { channel =>
      channel.socket.setReuseAddress(true)
      channel.configureBlocking(false)
      channel.socket.bind(new java.net.InetSocketAddress(port))
      channel.register(selector, SelectionKey.OP_ACCEPT, handler)

      while (selector.keys.size > 0) {
        selector.select
        val it = selector.selectedKeys.iterator
        while(it.hasNext) {
          val key = it.next
          it.remove
          key.attachment.asInstanceOf[Handler].handle(key)
        }
      }
    }
  }

  def using[T <: {def close(): Unit}, R](t: T)(f: T => R): R =
    try{ f(t) } finally{ t.close }

  def handler = new Handler {
    def handle(key: SelectionKey): Unit = if(key.isValid)
      if(key.isAcceptable) {
        val sc: SocketChannel = key.channel.asInstanceOf[ServerSocketChannel].accept
        sc.configureBlocking(false)
        sc.register(key.selector, SelectionKey.OP_READ, ioCallback)
      }
  }
}

abstract class Handler { def handle(key: SelectionKey): Unit }

class IoHandler[T <: Req, R <: Res] extends Handler {
  self: ReadChannel[T] with Process[T, R] with WriteChannel[R] =>
  def handle(key: SelectionKey): Unit = {
    if (key.isValid && key.isReadable) readChannel(key) map { request =>
      process(request) { response =>
        reply(response)
        writeChannel(key)
      }
    }
    if (key.isValid && key.isWritable) writeChannel(key)
  }
}


trait ReadChannel[T <: Req] {
  def readChannel(key: SelectionKey): Option[T]
}
trait Process[T <: Req, R <: Res] {
  def process(req: T)(reply: R => Unit): Unit
}
trait WriteChannel[R <: Res] {
  import collection.mutable.ListBuffer
  private val writers = ListBuffer.empty[Writer]
  def writeChannel(key: SelectionKey): Unit = {
    val sc: SocketChannel = key.channel.asInstanceOf[SocketChannel]
    if (writers.head.write(sc).hasRemaining) {
      key.interestOps(key.interestOps() | SelectionKey.OP_WRITE)
    } else {
      writers.remove(0)
      if (writers.isEmpty) {
        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
        sc.close
      } else {
        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE)
      }
    }
  }
  def reply(res: R): Unit = writers += res.writer
}

trait Req
trait Res extends Writable

trait Writable { val writer: Writer }
trait Writer {
  def write(sc: java.nio.channels.SocketChannel): Writer
  def hasRemaining: Boolean
}

HTTPによるチャネルとの IO を行う HttpIoProcess.scala

package etc9.aokan
import etc9.web.Web
import concurrent.ops._
import java.nio.channels.{SocketChannel, SelectionKey}

trait HttpReadChannel extends ReadChannel[HttpRequest] {
  private val buffer = IoBuffer(2048)

  def readChannel(key: SelectionKey): Option[HttpRequest] = {
    val channel = key.channel.asInstanceOf[SocketChannel]
    buffer read channel

    val header: Option[HttpHeader] = takeHeader(buffer) match {
      case Some(bytes) => withReturn(HttpHeader(bytes)) { buffer.remove(bytes.size) }
      case _ => None
    }

    header map ( h => h.method match {
      case POST if(buffer.length < h.contentLength) => null
      case POST => withReturn(new HttpRequest(h, takeBody(buffer, h.contentLength))) {
        buffer.remove(h.contentLength)
        halfClose(key, channel) }
      case _ => withReturn(new HttpRequest(h, None)) { halfClose(key, channel) }
    })
  }

  private def halfClose(key: SelectionKey, ch: SocketChannel) {
    key.interestOps(key.interestOps() & ~SelectionKey.OP_READ)
    ch.socket.shutdownInput
  }

  private def takeHeader(buf: IoBuffer): Option[Seq[Byte]] = {
    val CR: Int = 13 // carriage return
    val LF: Int = 10 // line feed
    ( 0 to buf.length - 4 ) find { i =>
      buf(i) == CR && buf(i+1) == LF && buf(i+2) == CR && buf(i+3) == LF
    } map { index => buf.take(index + 3) }
  }

  private def takeBody(seq: Seq[Byte], len: Int): Option[Seq[Byte]] =
    if(seq.length >= len) Some(seq.take(len)) else None

  private def withReturn[R, F](r: => R)(f: => F): R = {val ret = r; f; ret}
}


trait HttpProcess extends Process[HttpRequest, HttpResponse] {
  def process(req: HttpRequest)(reply: HttpResponse => Unit): Unit =
    if (req.header.path.endsWith(".do")) spawn {
      reply{ new HttpResponse(OK_RESPONSE, Web.mkContent(req)) }
    } else {
      val file = new java.io.File("./public" + req.header.path)
      reply {
        if (file.exists) new HttpResponse(OK_RESPONSE, Some(new FileContent(file)))
        else NotFound
      }
    }
}

trait HttpWriteChannel extends WriteChannel[HttpResponse]

HttpRequest.scala

package etc9.aokan
import etc9.aokan.Util._

class HttpRequest(val header: HttpHeader, val body: Option[Seq[Byte]]) extends Req {
  def forms = if(header.method == POST) header.headers.get("Content-Type") match {
    case Some("application/x-www-form-urlencoded") =>
      Some(parseQuery(new String(body.get.toArray, ISO_8859_1)))
    case Some("multipart/form-data") => None // not implemented
    case _ => None
  }
}

case class HttpHeader (
    val method: Method, val uri: String, val version: String,
    val headers: Map[String, String], val size: Int) {
  val contentLength = headers.getOrElse("Content-Length" ,"0").toInt
  def path: String = uri.split('?').head
  def query = parseQuery(uri.diff(path+'?'))
}

object HttpHeader {

  val REQUEST_LINE_RE = """^([A-Z]+) +([^ ]+) +HTTP/([0-9\.]+)$""".r
  val HEADER_RE = """^(.*): +(.*)$""".r

  def apply(seq: Seq[Byte]): Option[HttpHeader] = try {
    val lines = new String(seq.toArray, US_ASCII).lines
    val REQUEST_LINE_RE(method, path, version) = lines.next()
    val headers = for(HEADER_RE(k, v) <- lines) yield (k, v)
    Option(new HttpHeader(Method(method), path, version, headers.toMap, seq.length))
  } catch { case _ => None }
}

sealed abstract class Method
case object GET  extends Method
case object POST extends Method
case object HEAD extends Method
object Method {
  def apply(s: String) = s match {
    case "GET" => GET; case "POST" => POST; case "HEAD" => HEAD;
  }
}

HttpResponse.scala

package etc9.aokan
import etc9.aokan.Util._
import java.nio.ByteBuffer
import java.nio.channels.SocketChannel

class HttpResponse(val status: HttpStatus, val content:Option[Content] = None,
    val headers: Map[HeaderField, String] = Map.empty) extends Res {

  def headerBytes = {
    val default: Map[HeaderField, String] = Map(
      ContentLength -> (content match { case Some(c) => c.length.toString case _ => "0"}),
      Connection -> "close",
      Server -> "Aokan",
      Date -> GMT.dateString(System.currentTimeMillis))

    ("HTTP/1.1 " + status + "\r\n" +
      (default ++ headers).mkString("", "; ", "\r\n") + "\r\n").getBytes(US_ASCII)
  }

  val writer: Writer = new Writer { self =>
    val b = ByteBuffer.wrap(headerBytes)
    def write(sc: SocketChannel): Writer = {
      if (b.hasRemaining) sc.write(b)
      content map { c => c.writer.write(sc) }
      self
    }
    def hasRemaining = b.hasRemaining || content.exists(c => c.writer.hasRemaining == true)
  }
}
object BadRequest       extends HttpResponse(BAD_REQUEST)
object NotFound         extends HttpResponse(NOT_FOUND)
object MethodNotAllowed extends HttpResponse(METHOD_NOT_ALLOWED)
object MovedPermanently extends HttpResponse(MOVED_PERMANENTLY)
object MovedTemporarily extends HttpResponse(MOVED_TEMPORARILY)

sealed abstract class HttpStatus(code: Int, description: String) {
  override def toString = code + " " + description
}
case object OK_RESPONSE        extends HttpStatus(code=200, description="OK")
case object BAD_REQUEST        extends HttpStatus(code=400, description="Bad Request")
case object NOT_FOUND          extends HttpStatus(code=404, description="Not Found")
case object METHOD_NOT_ALLOWED extends HttpStatus(code=405, description="Method Not Allowed")
case object MOVED_PERMANENTLY  extends HttpStatus(code=301, description="Moved Permanently")
case object MOVED_TEMPORARILY  extends HttpStatus(code=302, description="Moved Temporarily")

IoBuffer.scala と Util.scala

package etc9.aokan

import java.nio.channels.ReadableByteChannel
import java.nio.ByteBuffer

class IoBuffer(var buf: ByteBuffer) extends Seq[Byte] {
  def apply(idx: Int) = buf.get(idx)
  def update(n: Int, e: Byte) = buf.put(n, e)
  def length = buf.limit
  def remove(idx: Int) { buf.position(idx); buf.compact }

  def iterator: Iterator[Byte] = new Iterator[Byte] {
    val b = buf.asReadOnlyBuffer; b.flip
    def next() = b.get
    def hasNext = b.hasRemaining
  }

  def read[C <: ReadableByteChannel](channel: C): Int = {
    var count = channel.read(buf)
    while(buf.capacity < buf.position + 2) {
      expand
      count += channel.read(buf)
    }
    count
  }

  private def expand {
    val save = (buf.position, buf.limit)
    buf.clear
    buf = ByteBuffer.allocate(nextCapacity()).put(buf)
    buf.position(save._1); buf.limit(save._2)
  }

  private def nextCapacity(current: Int = buf.capacity) = {
    val next = Integer.highestOneBit(current) << 1
    if (next < 0) Integer.MAX_VALUE else next
  }
}

object IoBuffer {
  def wrap(ioBuf: IoBuffer) = new IoBuffer(ioBuf.buf)
  def wrap(buf: ByteBuffer) = new IoBuffer(buf)
  def wrap(array: Array[Byte]) =  new IoBuffer(ByteBuffer.wrap(array))
  def apply(capacity: Int) = new IoBuffer(ByteBuffer.allocate(capacity))
}
package etc9.aokan

object Util {

  val UTF_8 = java.nio.charset.Charset.forName("UTF-8")
  val US_ASCII   = java.nio.charset.Charset.forName("US-ASCII")   // Protocol Charset
  val ISO_8859_1 = java.nio.charset.Charset.forName("ISO-8859-1") // http body Charset

  object GMT {
    val fmt = new java.text.SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss", java.util.Locale.US)
    val offset = java.util.TimeZone.getDefault.getRawOffset
    def dateString(mills : Long): String = fmt.synchronized {
      fmt.format(new java.util.Date(mills - offset)) + " GMT"
    }
  }

  def parseQuery(str: String): Map[String, String] = (for { q <- str.split('&'); if(q.indexOf('=') > 0)
    kv = q.split('=') } yield (decode(kv.head), decode(kv.tail.mkString))).toMap

  import java.net.{URLEncoder, URLDecoder}
  def decode(str: String) = URLDecoder.decode(str , UTF_8.name)
  def encode(str: String) = URLEncoder.encode(str , UTF_8.name)

  abstract class HeaderField(val name: String) { override def toString = name }
  case object Accept            extends HeaderField("Accept")
  case object AcceptCharset     extends HeaderField("Accept-Charset")
  case object AcceptLanguage    extends HeaderField("Accept-Language")
  case object Age               extends HeaderField("Age")
  case object Allow             extends HeaderField("Allow")
  case object Authorization     extends HeaderField("Authorization")
  case object CacheControl      extends HeaderField("Cache-Control")
  case object Connection        extends HeaderField("Connection")
  case object ContentLength     extends HeaderField("Content-Length")
  case object ContentType       extends HeaderField("Content-Type")
  case object Date              extends HeaderField("Date")
  case object Expect            extends HeaderField("Expect")
  case object Expires           extends HeaderField("Expires")
  case object Host              extends HeaderField("Host")
  case object IfModifiedSince   extends HeaderField("If-Modified-Since")
  case object LastModified      extends HeaderField("Last-Modified")
  case object Location          extends HeaderField("Location")
  case object Pragma            extends HeaderField("Pragma")
  case object Referer           extends HeaderField("Referer")
  case object Server            extends HeaderField("Server")
  case object UserAgent         extends HeaderField("User-Agent")
  ・・
}