前回
blog1.mammb.com
の続き。
エントリポイント Main.scala
package etc9.aokan object Main { def main(args: Array[String]):Unit = new Acceptor(8900, new IoHandler[HttpRequest, HttpResponse] with HttpReadChannel with HttpProcess with HttpWriteChannel) }
NIO系の処理を行う IoAcceptor.scala
package etc9.aokan import java.nio.channels.{ServerSocketChannel, SocketChannel, SelectionKey, Selector} class Acceptor(port: Int, ioCallback: => Handler) { using(Selector.open) { selector => using(ServerSocketChannel.open) { channel => channel.socket.setReuseAddress(true) channel.configureBlocking(false) channel.socket.bind(new java.net.InetSocketAddress(port)) channel.register(selector, SelectionKey.OP_ACCEPT, handler) while (selector.keys.size > 0) { selector.select val it = selector.selectedKeys.iterator while(it.hasNext) { val key = it.next it.remove key.attachment.asInstanceOf[Handler].handle(key) } } } } def using[T <: {def close(): Unit}, R](t: T)(f: T => R): R = try{ f(t) } finally{ t.close } def handler = new Handler { def handle(key: SelectionKey): Unit = if(key.isValid) if(key.isAcceptable) { val sc: SocketChannel = key.channel.asInstanceOf[ServerSocketChannel].accept sc.configureBlocking(false) sc.register(key.selector, SelectionKey.OP_READ, ioCallback) } } } abstract class Handler { def handle(key: SelectionKey): Unit } class IoHandler[T <: Req, R <: Res] extends Handler { self: ReadChannel[T] with Process[T, R] with WriteChannel[R] => def handle(key: SelectionKey): Unit = { if (key.isValid && key.isReadable) readChannel(key) map { request => process(request) { response => reply(response) writeChannel(key) } } if (key.isValid && key.isWritable) writeChannel(key) } } trait ReadChannel[T <: Req] { def readChannel(key: SelectionKey): Option[T] } trait Process[T <: Req, R <: Res] { def process(req: T)(reply: R => Unit): Unit } trait WriteChannel[R <: Res] { import collection.mutable.ListBuffer private val writers = ListBuffer.empty[Writer] def writeChannel(key: SelectionKey): Unit = { val sc: SocketChannel = key.channel.asInstanceOf[SocketChannel] if (writers.head.write(sc).hasRemaining) { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE) } else { writers.remove(0) if (writers.isEmpty) { key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); sc.close } else { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE) } } } def reply(res: R): Unit = writers += res.writer } trait Req trait Res extends Writable trait Writable { val writer: Writer } trait Writer { def write(sc: java.nio.channels.SocketChannel): Writer def hasRemaining: Boolean }
HTTPによるチャネルとの IO を行う HttpIoProcess.scala
package etc9.aokan import etc9.web.Web import concurrent.ops._ import java.nio.channels.{SocketChannel, SelectionKey} trait HttpReadChannel extends ReadChannel[HttpRequest] { private val buffer = IoBuffer(2048) def readChannel(key: SelectionKey): Option[HttpRequest] = { val channel = key.channel.asInstanceOf[SocketChannel] buffer read channel val header: Option[HttpHeader] = takeHeader(buffer) match { case Some(bytes) => withReturn(HttpHeader(bytes)) { buffer.remove(bytes.size) } case _ => None } header map ( h => h.method match { case POST if(buffer.length < h.contentLength) => null case POST => withReturn(new HttpRequest(h, takeBody(buffer, h.contentLength))) { buffer.remove(h.contentLength) halfClose(key, channel) } case _ => withReturn(new HttpRequest(h, None)) { halfClose(key, channel) } }) } private def halfClose(key: SelectionKey, ch: SocketChannel) { key.interestOps(key.interestOps() & ~SelectionKey.OP_READ) ch.socket.shutdownInput } private def takeHeader(buf: IoBuffer): Option[Seq[Byte]] = { val CR: Int = 13 // carriage return val LF: Int = 10 // line feed ( 0 to buf.length - 4 ) find { i => buf(i) == CR && buf(i+1) == LF && buf(i+2) == CR && buf(i+3) == LF } map { index => buf.take(index + 3) } } private def takeBody(seq: Seq[Byte], len: Int): Option[Seq[Byte]] = if(seq.length >= len) Some(seq.take(len)) else None private def withReturn[R, F](r: => R)(f: => F): R = {val ret = r; f; ret} } trait HttpProcess extends Process[HttpRequest, HttpResponse] { def process(req: HttpRequest)(reply: HttpResponse => Unit): Unit = if (req.header.path.endsWith(".do")) spawn { reply{ new HttpResponse(OK_RESPONSE, Web.mkContent(req)) } } else { val file = new java.io.File("./public" + req.header.path) reply { if (file.exists) new HttpResponse(OK_RESPONSE, Some(new FileContent(file))) else NotFound } } } trait HttpWriteChannel extends WriteChannel[HttpResponse]
HttpRequest.scala
package etc9.aokan import etc9.aokan.Util._ class HttpRequest(val header: HttpHeader, val body: Option[Seq[Byte]]) extends Req { def forms = if(header.method == POST) header.headers.get("Content-Type") match { case Some("application/x-www-form-urlencoded") => Some(parseQuery(new String(body.get.toArray, ISO_8859_1))) case Some("multipart/form-data") => None // not implemented case _ => None } } case class HttpHeader ( val method: Method, val uri: String, val version: String, val headers: Map[String, String], val size: Int) { val contentLength = headers.getOrElse("Content-Length" ,"0").toInt def path: String = uri.split('?').head def query = parseQuery(uri.diff(path+'?')) } object HttpHeader { val REQUEST_LINE_RE = """^([A-Z]+) +([^ ]+) +HTTP/([0-9\.]+)$""".r val HEADER_RE = """^(.*): +(.*)$""".r def apply(seq: Seq[Byte]): Option[HttpHeader] = try { val lines = new String(seq.toArray, US_ASCII).lines val REQUEST_LINE_RE(method, path, version) = lines.next() val headers = for(HEADER_RE(k, v) <- lines) yield (k, v) Option(new HttpHeader(Method(method), path, version, headers.toMap, seq.length)) } catch { case _ => None } } sealed abstract class Method case object GET extends Method case object POST extends Method case object HEAD extends Method object Method { def apply(s: String) = s match { case "GET" => GET; case "POST" => POST; case "HEAD" => HEAD; } }
HttpResponse.scala
package etc9.aokan import etc9.aokan.Util._ import java.nio.ByteBuffer import java.nio.channels.SocketChannel class HttpResponse(val status: HttpStatus, val content:Option[Content] = None, val headers: Map[HeaderField, String] = Map.empty) extends Res { def headerBytes = { val default: Map[HeaderField, String] = Map( ContentLength -> (content match { case Some(c) => c.length.toString case _ => "0"}), Connection -> "close", Server -> "Aokan", Date -> GMT.dateString(System.currentTimeMillis)) ("HTTP/1.1 " + status + "\r\n" + (default ++ headers).mkString("", "; ", "\r\n") + "\r\n").getBytes(US_ASCII) } val writer: Writer = new Writer { self => val b = ByteBuffer.wrap(headerBytes) def write(sc: SocketChannel): Writer = { if (b.hasRemaining) sc.write(b) content map { c => c.writer.write(sc) } self } def hasRemaining = b.hasRemaining || content.exists(c => c.writer.hasRemaining == true) } } object BadRequest extends HttpResponse(BAD_REQUEST) object NotFound extends HttpResponse(NOT_FOUND) object MethodNotAllowed extends HttpResponse(METHOD_NOT_ALLOWED) object MovedPermanently extends HttpResponse(MOVED_PERMANENTLY) object MovedTemporarily extends HttpResponse(MOVED_TEMPORARILY) sealed abstract class HttpStatus(code: Int, description: String) { override def toString = code + " " + description } case object OK_RESPONSE extends HttpStatus(code=200, description="OK") case object BAD_REQUEST extends HttpStatus(code=400, description="Bad Request") case object NOT_FOUND extends HttpStatus(code=404, description="Not Found") case object METHOD_NOT_ALLOWED extends HttpStatus(code=405, description="Method Not Allowed") case object MOVED_PERMANENTLY extends HttpStatus(code=301, description="Moved Permanently") case object MOVED_TEMPORARILY extends HttpStatus(code=302, description="Moved Temporarily")
IoBuffer.scala と Util.scala
package etc9.aokan import java.nio.channels.ReadableByteChannel import java.nio.ByteBuffer class IoBuffer(var buf: ByteBuffer) extends Seq[Byte] { def apply(idx: Int) = buf.get(idx) def update(n: Int, e: Byte) = buf.put(n, e) def length = buf.limit def remove(idx: Int) { buf.position(idx); buf.compact } def iterator: Iterator[Byte] = new Iterator[Byte] { val b = buf.asReadOnlyBuffer; b.flip def next() = b.get def hasNext = b.hasRemaining } def read[C <: ReadableByteChannel](channel: C): Int = { var count = channel.read(buf) while(buf.capacity < buf.position + 2) { expand count += channel.read(buf) } count } private def expand { val save = (buf.position, buf.limit) buf.clear buf = ByteBuffer.allocate(nextCapacity()).put(buf) buf.position(save._1); buf.limit(save._2) } private def nextCapacity(current: Int = buf.capacity) = { val next = Integer.highestOneBit(current) << 1 if (next < 0) Integer.MAX_VALUE else next } } object IoBuffer { def wrap(ioBuf: IoBuffer) = new IoBuffer(ioBuf.buf) def wrap(buf: ByteBuffer) = new IoBuffer(buf) def wrap(array: Array[Byte]) = new IoBuffer(ByteBuffer.wrap(array)) def apply(capacity: Int) = new IoBuffer(ByteBuffer.allocate(capacity)) }
package etc9.aokan object Util { val UTF_8 = java.nio.charset.Charset.forName("UTF-8") val US_ASCII = java.nio.charset.Charset.forName("US-ASCII") // Protocol Charset val ISO_8859_1 = java.nio.charset.Charset.forName("ISO-8859-1") // http body Charset object GMT { val fmt = new java.text.SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss", java.util.Locale.US) val offset = java.util.TimeZone.getDefault.getRawOffset def dateString(mills : Long): String = fmt.synchronized { fmt.format(new java.util.Date(mills - offset)) + " GMT" } } def parseQuery(str: String): Map[String, String] = (for { q <- str.split('&'); if(q.indexOf('=') > 0) kv = q.split('=') } yield (decode(kv.head), decode(kv.tail.mkString))).toMap import java.net.{URLEncoder, URLDecoder} def decode(str: String) = URLDecoder.decode(str , UTF_8.name) def encode(str: String) = URLEncoder.encode(str , UTF_8.name) abstract class HeaderField(val name: String) { override def toString = name } case object Accept extends HeaderField("Accept") case object AcceptCharset extends HeaderField("Accept-Charset") case object AcceptLanguage extends HeaderField("Accept-Language") case object Age extends HeaderField("Age") case object Allow extends HeaderField("Allow") case object Authorization extends HeaderField("Authorization") case object CacheControl extends HeaderField("Cache-Control") case object Connection extends HeaderField("Connection") case object ContentLength extends HeaderField("Content-Length") case object ContentType extends HeaderField("Content-Type") case object Date extends HeaderField("Date") case object Expect extends HeaderField("Expect") case object Expires extends HeaderField("Expires") case object Host extends HeaderField("Host") case object IfModifiedSince extends HeaderField("If-Modified-Since") case object LastModified extends HeaderField("Last-Modified") case object Location extends HeaderField("Location") case object Pragma extends HeaderField("Pragma") case object Referer extends HeaderField("Referer") case object Server extends HeaderField("Server") case object UserAgent extends HeaderField("User-Agent") ・・ }