seratch's weblog in Japanese

About Scala, Java and Ruby programming in Japaense. If you need English information, go to http://blog.seratch.net/

Akka Actor 1.x 導入

対象とするバージョン

この記事は、Akka 1.1.3 を対象に書かれています。

Akkaとは?

  • Scale up (Concurrency)
  • Scale out (Remoting)
  • Fault tolerance

Akka は Scala/Java でイベント駆動型のアプリケーションを書くためのフレームワークです。

Scala 標準の Actor よりもインタフェースが抽象化されており、また、耐障害性に優れたアプリケーションを書くことができます。

ライセンスはApache Lisence 2.0です。

http://akka.io/docs/akka/1.1.3/intro/why-akka.html

利用方法

Akka は機能毎に別々の jar ファイルとしてリリースされています。詳細は以下のページにある一覧をご確認ください。

http://akka.io/docs/akka/1.1.3/intro/getting-started.html

ソースコード

ソースコードは GitHub で公開されています。

https://github.com/jboner/akka

実績

チャットサーバがサンプルとしてよく挙げられますが、様々な用途で既に実績があるようです。

http://akka.io/docs/akka/1.1.3/intro/use-cases.html

akka.conf の読み込み

akka.conf の読み込みをしなかった場合は Actor の生成時に以下のような warning メッセージが出力されますが、この場合はデフォルトの設定で動作します。

Can't load 'akka.conf'.
One of the three ways of locating the 'akka.conf' file needs to be defined:
        1. Define the '-Dakka.config=...' system property option.
        2. Put the 'akka.conf' file on the classpath.
        3. Define 'AKKA_HOME' environment variable pointing to the root of the A
kka distribution.
I have no way of finding the 'akka.conf' configuration file.
Using default values everywhere.

上記のメッセージの通り、3通りの設定方法がありますが、2.の「akka.conf」をクラスパスを含めるやり方はたとえば以下のようになります。

// カレントディレクトリにakka.confがある想定
scala -cp akka--actor-1.1.3.jar:.
// Loading config [akka.conf] from the application classpath.
// ※Windowsの場合は scala -cp akka--actor-1.1.3.jar:.

akka.conf の設定方法はドキュメントをご覧ください。

http://akka.io/docs/akka/1.1.3/general/configuration.html

// akka.conf
akka {
}

scala.actors.Actor と akka.actor.Actor は別物である件

Scala標準のActorと特に互換性はないので、メッセージを送信し合う事はできません。

class MyAkkaActor extends akka.actor.Actor {
  def receive = {
    case msg => self.reply(msg.toString * 2)
  }
}
case class ScalaActor(ref: akka.actor.ActorRef) extends actors.Actor {
  def act = loop {
    react {
      case msg => ref ! msg
    }
  }
}
val akkaActor = akka.actor.Actor.actorOf[MyAkkaActor].start
val scalaActor = new ScalaActor(akkaActor).start

scalaActor ! "test"

// akka.actor.IllegalActorStateException:
//         No sender in scope, can't reply.
//         You have probably:
//                 1. Sent a message to an Actor from an instance that is NOT an Actor.
//                 2. Invoked a method on an TypedActor from an instance NOT an TypedActor.
//         Else you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope
//         at akka.actor.ScalaActorRef$class.reply(ActorRef.scala:1382)

Scala 標準の Actor と Akka Actor の違い

同じ Actor を Scala 標準の Actor と Akka の Actor で書いてみると以下のような違いがあります。

  • scala.actors.Actor と akka.actor.Actor
  • act メソッドではなく receive というメソッドを実装する
  • loop にしなくてもデフォルトでループになっている
  • Akka では start などの自身に定義されたメソッドを呼ぶときにself.が必要
  • 同期は「!?」ではなく「!!」、Futureは「!!」ではなく「!!!」
  • 同期での受け取りは Option 型、Future は akka.dispatch.Future 型
import actors.Actor
class MyActor extends Actor {
  start
  def act() = loop { 
    react { 
      case msg => { 
        println(msg + " by Actor")
        sender ! (msg.toString * 2)
      } 
    } 
  }
}
val myActor = new MyActor
myActor ! "test" // 非同期
val res = myActor !? "test" // 同期
val f = myActor !! "test" // Future
// scala -cp akka-actor-1.1.3.jar:.
import akka.actor.Actor
class MyAkkaActor extends Actor {
  self.start
  def receive = { 
    case msg => { 
      println(msg + " by Akka Actor")
      self.reply(msg.toString * 2)
    }
  }
}

val myAkkaActor = Actor.actorOf[MyAkkaActor] // Can't load 'akka.conf'.
myAkkaActor ! "test" // 非同期 ... IllegalActorStateExceptionが発生
val resultOption = myAkkaActor !! "test" // 同期
resultOption match { 
  case Some(result) => println(result)  // "testtest"
  case _ => println("None") 
}
val resultFuture = myAkkaActor !!! "test" // akka.dispatch.Future
resultFuture.result match { 
  case Some(result) => println(result)  // "testtest"
  case _ => println("None") 
}

同期処理で応答がなくてもだんまりにならない Akka Actor

同期処理で受け手側がメッセージを送り返さない場合、普通の Actor は待ち続けてしまいますが・・

import actors.Actor
class MyActor extends Actor {
  def act() = loop { 
    react { 
      case msg => println(msg + " by Actor")
    } 
  }
}
val myActor = new MyActor().start
val res = myActor !? "test" // 応答がないのに待ち続ける

Akka の Actor はタイムアウト(デフォルトは5000ミリ秒)して None を返します。

import akka.actor.Actor
class MyAkkaActor extends Actor {
  def receive = {
    case msg => println(msg + " by Akka Actor and no reply")
  }
}
val myAkkaActor = Actor.actorOf[MyAkkaActor].start
val res = myAkkaActor !! "test" // 応答がない場合はNoneが返る

sender が Actor でない場合のreplyの「!」に対する挙動の違い

通常の Actor の場合、reply 先が Actor でなかった場合、「!」へのreplyはそのまま捨てられるだけですが・・

class ReplySample extends actors.Actor {
  def act() = loop { 
    react { 
      // case msg => sender ! msg.toString * 2 
      case msg => reply(msg.toString * 2)
    } 
  }
}
val r = new ReplySample().start
r ! "test" // replyは捨てられるだけ

Akka Actor の場合は IllegalActorStateException が throw されます。

import akka.actor.Actor
class ReplyAkkaSample extends Actor {
  def receive = { case msg => self.reply(msg.toString * 2) }
}
import akka.actor.Actor._
val r = actorOf[ReplyAkkaSample].start
r ! "test"

// akka.actor.IllegalActorStateException:
//   No sender in scope, can't reply.
//   You have probably:
//     1. Sent a message to an Actor from an instance that is NOT an Actor.
//     2. Invoked a method on an TypedActor from an instance NOT an TypedActor.
//   Else you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope

「reply」ではなく「reply_?」や「replySafe」を使うとこの例外が発生しなくなります。

import akka.actor.{Actor,ActorRef}
case class Response(msg: String)
class ReplyAkkaSample2 extends Actor {
  def receive = { 
    case msg:String => {
      self.reply_?(Response(msg.toString * 2))
    }
  }
}
import akka.actor.Actor._
// senderがActorでない場合
val r = actorOf[ReplyAkkaSample2].start
r ! "test" // 何も出力されない
// senderがActorの場合
case class Client(ref:ActorRef) extends Actor {
  def receive = { 
    case Response(msg) => println(msg)
    case msg => ref ! msg 
  }
}
val c = actorOf(Client(r)).start
c ! "test" // "testtest"

Akka Actor の sender への参照は Option になっているので以下のように制御することもできます。

import akka.actor.{Actor,ActorRef}
case class Response(msg: String)
class ReplyAkkaSample3 extends Actor {
  def receive = { 
    case msg => {
      self.sender match {
        case Some(sender) => sender ! (msg.toString * 2)
        case _ => println("no sender")
      }
    }
  }
}
import akka.actor.Actor._
val r = actorOf[ReplyAkkaSample3].start
r ! "test" // "no sender"

公式ドキュメントでは Option#foreach を呼んで None のパターンを無視するサンプル例があります。

http://akka.io/docs/akka/1.1.3/scala/actors.html

import akka.actor.{Actor,ActorRef}
case class Response(msg: String)
class ReplyAkkaSample3 extends Actor {
  def receive = { 
    case msg => self.sender.foreach(_ ! Response(msg.toString * 2))
  }
}
import akka.actor.Actor._
val r = actorOf[ReplyAkkaSample3].start
r ! "test" // 何も出力されない(IllegalActorStateExceptionがthrowされない)
// senderがActorの場合
case class Client(ref:ActorRef) extends Actor {
  def receive = { 
    case Response(msg) => println(msg)
    case msg => ref ! msg 
  }
}
val c = actorOf(Client(r)).start
c ! "test" // "testtest"

メッセージを forward する

別の Actor にメッセージをフォワードすることができます。

import akka.actor.{Actor,ActorRef}
class Printer extends Actor {
  def receive = { case msg => println(msg) }
}
case class Forwarder(next: ActorRef) extends Actor {
  def receive = { case msg => next forward (msg.toString*2) }
}
import akka.actor.Actor._
val ref = actorOf[Printer].start
val f = actorOf(Forwarder(ref)).start
f ! "test" // "testtest"

起動停止のタイミングをフックする

以下の4つのタイミングに呼び出される callback を override してフックすることができます。

  • preStart
  • postStop
  • preRestart
  • postRestart
import akka.actor.Actor
class Hooked extends Actor { 
  def receive = { case msg => println(msg) } 
  override def preStart() = println("preStart")
}
val h = Actor.actorOf[Hooked].start
// "preStart"

EventHandler でロギング

EventHandler は Akka のロギングシステムです。

import akka.actor.Actor
import akka.event.EventHandler
class Worker extends Actor {
  def receive = { 
    case msg => EventHandler.info(this, msg)
  }
}
val worker = Actor.actorOf[Worker].start
worker ! "test"
// [INFO]    [8/3/11 0:02 AM] [akka:event-driven:dispatcher:global-1] [Worker] test

Scheduler でスケジューリング

Scheduler にレシーバとなる Actor を登録しておくと等間隔に実行することができます。

/*
 // 一度だけ起動するようスケジューリング
 Scheduler.scheduleOnce(
   receiverActor, 
   messageToBeSent, 
   initialDelayBeforeSending, 
   timeUnit
 )
 // 等間隔での起動をスケジューリング
 Scheduler.schedule(
   receiverActor, 
   messageToBeSent, 
   initialDelayBeforeSending, 
   delayBetweenMessages, 
   timeUnit
 )
*/

import akka.actor.Actor
import akka.actor.Scheduler
import java.util.concurrent.TimeUnit

class Recv extends Actor {
  self.start
  def receive = { case "test" => println("ok") }
}
Scheduler.scheduleOnce(Actor.actorOf[Recv],"test",0,TimeUnit.SECONDS)
// "ok"
Scheduler.schedule(Actor.actorOf[Recv],"test",0,3,TimeUnit.SECONDS)
// "ok" "ok" "ok" ...

Supervisor による耐障害性

http://akka.io/docs/akka/1.1.3/scala/fault-tolerance.html

Erlang を開発した Joe Armstrong が提唱する "Let It Crash" という設計思想をベースに、分散された環境での並行処理においても高い耐障害性を得ることができます。

"Let It Crash" の考え方では、個別に例外処理を行うより例外もアプリケーションのライフサイクルの中では自然に発生しうる状態の一つと捉えて Supervisor によるリスタートで自動復旧するよう設計します。

Supervisor(監督役)

Supervisor の役割は子プロセスの監視(monitoring its child processes)です。

Supervisor は子プロセスが異常終了した場合、必要に応じて子プロセスをリスタートさせます。

import akka.actor.Actor
import akka.actor.Actor._
import akka.actor.Supervisor
import akka.config.Supervision._

class MyAkkaActor extends Actor {
  self.start()
  var isFirstMessage = true
  def receive = {
    case msg => {
      if ( isFirstMessage  ) println(msg + " is the first message!") 
      else println(msg + " is NOT the first message!")
      isFirstMessage = false
      throw new RuntimeException("test")
    }
  }
  override def postRestart(reason: Throwable) = {
    println("Restarted!")
    isFirstMessage = true
  }
}
val myActor = actorOf[MyAkkaActor]
myActor ! "test" 
// test is the first message!
// RuntimeExceptionのスタックトレース
myActor ! "test" 
// test is NOT the first message!
// RuntimeExceptionのスタックトレース

val myActor2 = actorOf[MyAkkaActor]
val supervisor = Supervisor(
  SupervisorConfig(
    AllForOneStrategy(List(classOf[Exception]), 3, 1000),
    Supervise(myActor2 , Permanent) :: Nil
  )
).start
myActor2 ! "test" 
// test is the first message!
// RuntimeExceptionのスタックトレース
// "Restarted!"
myActor2 ! "test" 
// test is the first message!
// RuntimeExceptionのスタックトレース
// "Restarted!"
Supervisorによるリスタートの種類

Akka の Supervisor には「One-For-One」と「All-For-One」の二つの Restart Strategy があります。

これらは FaultHandlingStrategy 型のオブジェクトです。

  • One-For-One (OneForOneStrategy)

もしいずれかの子プロセスが異常終了したら、そのプロセスのみをリスタートさせます。

  • All-For-One (AllForOneStrategy)

もしいずれかの子プロセスが異常終了したら、全てのプロセスをリスタートさせます。

なお、Restart Strategory については Erlang のドキュメントも併せてご参照ください。 http://www.erlang.org/doc/design_principles/sup_princ.html

// new OneForOneStrategy {
new AllForOneStrategy { 
  new Class[]{ Exception.class }, // ハンドル対象の例外
  3, // リスタートの最大リトライ回数
  5000 // 指定時間(ミリ秒)
}

Remote Actor でリモート呼び出し

ポートを指定してサーバを立ち上げると、リモートの Actor からリクエストすることができます。

// scala -cp .:akka-actor-1.1.3.jar:akka-remote-1.1.3.jar:protobuf-java-2.3.0.jar:netty-3.2.4.Final.jar;commons-io-2.0.1.jar
// ※Windowsの場合はセミコロンで区切る scala -cp .;akka-actor-1.1.3.jar;...
class EchoServer extends akka.actor.Actor {
  def receive = {
    case msg => self.reply(msg.toString.toUpperCase)
  }
}
import akka.actor.Actor._
remote.start("localhost",999).register("echoServer",actorOf[EchoServer].start)

クライアントは以下のような実装になります。

// scala -cp .:akka-actor-1.1.3.jar:akka-remote-1.1.3.jar:protobuf-java-2.3.0.jar:netty-3.2.4.Final.jar;commons-io-2.0.1.jar
import akka.actor.Actor._
val client = remote.actorFor("echoServer", "localhost", 999)
val resultOption = client !! "Hello World!"
resultOption foreach { res => println(res) }
// HELLO WORLD!