Akka Actor 1.x 導入
対象とするバージョン
この記事は、Akka 1.1.3 を対象に書かれています。
Akkaとは?
- Scale up (Concurrency)
- Scale out (Remoting)
- Fault tolerance
Akka は Scala/Java でイベント駆動型のアプリケーションを書くためのフレームワークです。
Scala 標準の Actor よりもインタフェースが抽象化されており、また、耐障害性に優れたアプリケーションを書くことができます。
ライセンスはApache Lisence 2.0です。
http://akka.io/docs/akka/1.1.3/intro/why-akka.html
利用方法
Akka は機能毎に別々の jar ファイルとしてリリースされています。詳細は以下のページにある一覧をご確認ください。
akka.conf の読み込み
akka.conf の読み込みをしなかった場合は Actor の生成時に以下のような warning メッセージが出力されますが、この場合はデフォルトの設定で動作します。
Can't load 'akka.conf'. One of the three ways of locating the 'akka.conf' file needs to be defined: 1. Define the '-Dakka.config=...' system property option. 2. Put the 'akka.conf' file on the classpath. 3. Define 'AKKA_HOME' environment variable pointing to the root of the A kka distribution. I have no way of finding the 'akka.conf' configuration file. Using default values everywhere.
上記のメッセージの通り、3通りの設定方法がありますが、2.の「akka.conf」をクラスパスを含めるやり方はたとえば以下のようになります。
// カレントディレクトリにakka.confがある想定 scala -cp akka--actor-1.1.3.jar:. // Loading config [akka.conf] from the application classpath. // ※Windowsの場合は scala -cp akka--actor-1.1.3.jar:.
akka.conf の設定方法はドキュメントをご覧ください。
http://akka.io/docs/akka/1.1.3/general/configuration.html
// akka.conf
akka {
}
scala.actors.Actor と akka.actor.Actor は別物である件
Scala標準のActorと特に互換性はないので、メッセージを送信し合う事はできません。
class MyAkkaActor extends akka.actor.Actor { def receive = { case msg => self.reply(msg.toString * 2) } } case class ScalaActor(ref: akka.actor.ActorRef) extends actors.Actor { def act = loop { react { case msg => ref ! msg } } } val akkaActor = akka.actor.Actor.actorOf[MyAkkaActor].start val scalaActor = new ScalaActor(akkaActor).start scalaActor ! "test" // akka.actor.IllegalActorStateException: // No sender in scope, can't reply. // You have probably: // 1. Sent a message to an Actor from an instance that is NOT an Actor. // 2. Invoked a method on an TypedActor from an instance NOT an TypedActor. // Else you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope // at akka.actor.ScalaActorRef$class.reply(ActorRef.scala:1382)
Scala 標準の Actor と Akka Actor の違い
同じ Actor を Scala 標準の Actor と Akka の Actor で書いてみると以下のような違いがあります。
- scala.actors.Actor と akka.actor.Actor
- act メソッドではなく receive というメソッドを実装する
- loop にしなくてもデフォルトでループになっている
- Akka では start などの自身に定義されたメソッドを呼ぶときにself.が必要
- 同期は「!?」ではなく「!!」、Futureは「!!」ではなく「!!!」
- 同期での受け取りは Option 型、Future は akka.dispatch.Future 型
import actors.Actor class MyActor extends Actor { start def act() = loop { react { case msg => { println(msg + " by Actor") sender ! (msg.toString * 2) } } } } val myActor = new MyActor myActor ! "test" // 非同期 val res = myActor !? "test" // 同期 val f = myActor !! "test" // Future
// scala -cp akka-actor-1.1.3.jar:. import akka.actor.Actor class MyAkkaActor extends Actor { self.start def receive = { case msg => { println(msg + " by Akka Actor") self.reply(msg.toString * 2) } } } val myAkkaActor = Actor.actorOf[MyAkkaActor] // Can't load 'akka.conf'. myAkkaActor ! "test" // 非同期 ... IllegalActorStateExceptionが発生 val resultOption = myAkkaActor !! "test" // 同期 resultOption match { case Some(result) => println(result) // "testtest" case _ => println("None") } val resultFuture = myAkkaActor !!! "test" // akka.dispatch.Future resultFuture.result match { case Some(result) => println(result) // "testtest" case _ => println("None") }
同期処理で応答がなくてもだんまりにならない Akka Actor
同期処理で受け手側がメッセージを送り返さない場合、普通の Actor は待ち続けてしまいますが・・
import actors.Actor class MyActor extends Actor { def act() = loop { react { case msg => println(msg + " by Actor") } } } val myActor = new MyActor().start val res = myActor !? "test" // 応答がないのに待ち続ける
Akka の Actor はタイムアウト(デフォルトは5000ミリ秒)して None を返します。
import akka.actor.Actor class MyAkkaActor extends Actor { def receive = { case msg => println(msg + " by Akka Actor and no reply") } } val myAkkaActor = Actor.actorOf[MyAkkaActor].start val res = myAkkaActor !! "test" // 応答がない場合はNoneが返る
sender が Actor でない場合のreplyの「!」に対する挙動の違い
通常の Actor の場合、reply 先が Actor でなかった場合、「!」へのreplyはそのまま捨てられるだけですが・・
class ReplySample extends actors.Actor { def act() = loop { react { // case msg => sender ! msg.toString * 2 case msg => reply(msg.toString * 2) } } } val r = new ReplySample().start r ! "test" // replyは捨てられるだけ
Akka Actor の場合は IllegalActorStateException が throw されます。
import akka.actor.Actor class ReplyAkkaSample extends Actor { def receive = { case msg => self.reply(msg.toString * 2) } } import akka.actor.Actor._ val r = actorOf[ReplyAkkaSample].start r ! "test" // akka.actor.IllegalActorStateException: // No sender in scope, can't reply. // You have probably: // 1. Sent a message to an Actor from an instance that is NOT an Actor. // 2. Invoked a method on an TypedActor from an instance NOT an TypedActor. // Else you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope
「reply」ではなく「reply_?」や「replySafe」を使うとこの例外が発生しなくなります。
import akka.actor.{Actor,ActorRef} case class Response(msg: String) class ReplyAkkaSample2 extends Actor { def receive = { case msg:String => { self.reply_?(Response(msg.toString * 2)) } } } import akka.actor.Actor._ // senderがActorでない場合 val r = actorOf[ReplyAkkaSample2].start r ! "test" // 何も出力されない // senderがActorの場合 case class Client(ref:ActorRef) extends Actor { def receive = { case Response(msg) => println(msg) case msg => ref ! msg } } val c = actorOf(Client(r)).start c ! "test" // "testtest"
Akka Actor の sender への参照は Option になっているので以下のように制御することもできます。
import akka.actor.{Actor,ActorRef} case class Response(msg: String) class ReplyAkkaSample3 extends Actor { def receive = { case msg => { self.sender match { case Some(sender) => sender ! (msg.toString * 2) case _ => println("no sender") } } } } import akka.actor.Actor._ val r = actorOf[ReplyAkkaSample3].start r ! "test" // "no sender"
公式ドキュメントでは Option#foreach を呼んで None のパターンを無視するサンプル例があります。
http://akka.io/docs/akka/1.1.3/scala/actors.html
import akka.actor.{Actor,ActorRef} case class Response(msg: String) class ReplyAkkaSample3 extends Actor { def receive = { case msg => self.sender.foreach(_ ! Response(msg.toString * 2)) } } import akka.actor.Actor._ val r = actorOf[ReplyAkkaSample3].start r ! "test" // 何も出力されない(IllegalActorStateExceptionがthrowされない) // senderがActorの場合 case class Client(ref:ActorRef) extends Actor { def receive = { case Response(msg) => println(msg) case msg => ref ! msg } } val c = actorOf(Client(r)).start c ! "test" // "testtest"
メッセージを forward する
別の Actor にメッセージをフォワードすることができます。
import akka.actor.{Actor,ActorRef} class Printer extends Actor { def receive = { case msg => println(msg) } } case class Forwarder(next: ActorRef) extends Actor { def receive = { case msg => next forward (msg.toString*2) } } import akka.actor.Actor._ val ref = actorOf[Printer].start val f = actorOf(Forwarder(ref)).start f ! "test" // "testtest"
起動停止のタイミングをフックする
以下の4つのタイミングに呼び出される callback を override してフックすることができます。
- preStart
- postStop
- preRestart
- postRestart
import akka.actor.Actor class Hooked extends Actor { def receive = { case msg => println(msg) } override def preStart() = println("preStart") } val h = Actor.actorOf[Hooked].start // "preStart"
EventHandler でロギング
EventHandler は Akka のロギングシステムです。
import akka.actor.Actor import akka.event.EventHandler class Worker extends Actor { def receive = { case msg => EventHandler.info(this, msg) } } val worker = Actor.actorOf[Worker].start worker ! "test" // [INFO] [8/3/11 0:02 AM] [akka:event-driven:dispatcher:global-1] [Worker] test
Scheduler でスケジューリング
Scheduler にレシーバとなる Actor を登録しておくと等間隔に実行することができます。
/* // 一度だけ起動するようスケジューリング Scheduler.scheduleOnce( receiverActor, messageToBeSent, initialDelayBeforeSending, timeUnit ) // 等間隔での起動をスケジューリング Scheduler.schedule( receiverActor, messageToBeSent, initialDelayBeforeSending, delayBetweenMessages, timeUnit ) */ import akka.actor.Actor import akka.actor.Scheduler import java.util.concurrent.TimeUnit class Recv extends Actor { self.start def receive = { case "test" => println("ok") } } Scheduler.scheduleOnce(Actor.actorOf[Recv],"test",0,TimeUnit.SECONDS) // "ok" Scheduler.schedule(Actor.actorOf[Recv],"test",0,3,TimeUnit.SECONDS) // "ok" "ok" "ok" ...
Supervisor による耐障害性
http://akka.io/docs/akka/1.1.3/scala/fault-tolerance.html
Erlang を開発した Joe Armstrong が提唱する "Let It Crash" という設計思想をベースに、分散された環境での並行処理においても高い耐障害性を得ることができます。
"Let It Crash" の考え方では、個別に例外処理を行うより例外もアプリケーションのライフサイクルの中では自然に発生しうる状態の一つと捉えて Supervisor によるリスタートで自動復旧するよう設計します。
Supervisor(監督役)
Supervisor の役割は子プロセスの監視(monitoring its child processes)です。
Supervisor は子プロセスが異常終了した場合、必要に応じて子プロセスをリスタートさせます。
import akka.actor.Actor import akka.actor.Actor._ import akka.actor.Supervisor import akka.config.Supervision._ class MyAkkaActor extends Actor { self.start() var isFirstMessage = true def receive = { case msg => { if ( isFirstMessage ) println(msg + " is the first message!") else println(msg + " is NOT the first message!") isFirstMessage = false throw new RuntimeException("test") } } override def postRestart(reason: Throwable) = { println("Restarted!") isFirstMessage = true } } val myActor = actorOf[MyAkkaActor] myActor ! "test" // test is the first message! // RuntimeExceptionのスタックトレース myActor ! "test" // test is NOT the first message! // RuntimeExceptionのスタックトレース val myActor2 = actorOf[MyAkkaActor] val supervisor = Supervisor( SupervisorConfig( AllForOneStrategy(List(classOf[Exception]), 3, 1000), Supervise(myActor2 , Permanent) :: Nil ) ).start myActor2 ! "test" // test is the first message! // RuntimeExceptionのスタックトレース // "Restarted!" myActor2 ! "test" // test is the first message! // RuntimeExceptionのスタックトレース // "Restarted!"
Supervisorによるリスタートの種類
Akka の Supervisor には「One-For-One」と「All-For-One」の二つの Restart Strategy があります。
これらは FaultHandlingStrategy 型のオブジェクトです。
- One-For-One (OneForOneStrategy)
もしいずれかの子プロセスが異常終了したら、そのプロセスのみをリスタートさせます。
- All-For-One (AllForOneStrategy)
もしいずれかの子プロセスが異常終了したら、全てのプロセスをリスタートさせます。
なお、Restart Strategory については Erlang のドキュメントも併せてご参照ください。 http://www.erlang.org/doc/design_principles/sup_princ.html
// new OneForOneStrategy { new AllForOneStrategy { new Class[]{ Exception.class }, // ハンドル対象の例外 3, // リスタートの最大リトライ回数 5000 // 指定時間(ミリ秒) }
Remote Actor でリモート呼び出し
ポートを指定してサーバを立ち上げると、リモートの Actor からリクエストすることができます。
// scala -cp .:akka-actor-1.1.3.jar:akka-remote-1.1.3.jar:protobuf-java-2.3.0.jar:netty-3.2.4.Final.jar;commons-io-2.0.1.jar // ※Windowsの場合はセミコロンで区切る scala -cp .;akka-actor-1.1.3.jar;... class EchoServer extends akka.actor.Actor { def receive = { case msg => self.reply(msg.toString.toUpperCase) } } import akka.actor.Actor._ remote.start("localhost",999).register("echoServer",actorOf[EchoServer].start)
クライアントは以下のような実装になります。
// scala -cp .:akka-actor-1.1.3.jar:akka-remote-1.1.3.jar:protobuf-java-2.3.0.jar:netty-3.2.4.Final.jar;commons-io-2.0.1.jar import akka.actor.Actor._ val client = remote.actorFor("echoServer", "localhost", 999) val resultOption = client !! "Hello World!" resultOption foreach { res => println(res) } // HELLO WORLD!