コミケ告知

サークル活動の詳細は circle タグの記事へ。
ラベル Akka の投稿を表示しています。 すべての投稿を表示
ラベル Akka の投稿を表示しています。 すべての投稿を表示
2015年9月1日火曜日

Akkaのsenderは値ではない

久々に使うと忘れがちなのでメモ。

AkkaのActorは、メッセージ受信時にはその送信元を示すActorRefをsenderで取れます。メッセージを送り返したり、デバッグ時に表示させたり、何かと便利です。
def receive = {
  case x => sender ! x  // echo
}
しかしこのsender、Futureで使おうとすると望み通りの挙動をしません。表示させてみると、送信元とは違う値になっています。(deadLetters宛になる)
Future {
  sender ! myFunc(x)  // NG
}
こういうときは、一時的に別の定数に入れて使うか…
val dst = sender
Future {
  dst ! myFunc(x)  // OK
}
最後に結果を返したいだけなら、pipeToを用いるか。
Future {
  myFunc(x)
} pipeTo sender

senderとは何なのか

senderはcontext.senderを呼び出すようになっていて、さらに追っていくとActorCellの実装に辿り着きます。
  final def sender(): ActorRef = currentMessage match {
    case null                      ⇒ system.deadLetters
    case msg if msg.sender ne null ⇒ msg.sender
    case _                         ⇒ system.deadLetters
  }
これは変数ではなくメソッドであるので、sender と記述したところに制御が到達したタイミングで、初めて処理が行われます。ちなみにvar currentMessageを弄っているのも同じActorCell内で…
  final def invoke(messageHandle: Envelope): Unit = try {
    currentMessage = messageHandle
    cancelReceiveTimeout() // FIXME: leave this here???
    messageHandle.message match {
      case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle)
      case msg                      ⇒ receiveMessage(msg)
    }
    currentMessage = null // reset current message after successful invocation
  }
  //  以下略…
まずcurrentMessageがセットされたのち、システム側で処理されるAutoReceivedMessage以外であれば、receiveMessageからActorのおなじみreceiveメソッドの処理へ。終わったら戻ってきてnullに戻します。(普通にnull使うんだ… 初期値が _ だからnullで統一してるんだな)
Futureの処理がどこかで走るときには、このメッセージ受信による処理駆動パスとは異なるので、 currentMessage == null であり、 sender()はdeadLettersを返すのですね。

カッコ省略のデメリット?

仕組みが分かっていない段階で誤った使い方をしてしまった原因のひとつが、引数なしメソッドのカッコが省略されていることでした。サンプルでも基本的に省略されているから、そもそも最初はメソッドだと思っていなかったよ…(´・ω・`)
記述が簡潔!の思想に走りすぎるとデメリットもありますよ、ということで。
2014年10月11日土曜日

Akkaの挙動確認用の小物を作った

AkkaのActorスケジューリング周りの挙動を確認するための、ちょっとしたものを作りました。

概要

Actorの数、OSスレッドの数、Actor1個あたりに飛ばすメッセージ数を指定して回し、簡単なレポートを出力します。Actorのメッセージハンドリングが、どのスレッドでどんな順番で実行されたのかが表示されます。
適当に時間がかかるように、Actorはメッセージを受け取るごとにフィボナッチ数を求めています。この実験では別にsleepでも良いんですが、後々試したいことがいくつかあることと、OSやVMによるスレッドのスケジューリングに手を出したくないことから、この方法にしました。

Akkaの設定

参考:Configuration — Akka Documentation

スレッド数の設定

スレッド数は固定値を指定するものではなく「factor」「parallelism-min」「parallelism-max」の3項目と、実行環境で認識されたプロセッサー数によって決定します。まず最初に使われるのがfactorで、(プロセッサー数 * factor)を切り上げた値が用いられます。その後、parallelism-minparallelism-maxの指定範囲内にない場合は、指定範囲に収まるように値が補正されます。minよりmaxの方が小さい場合、maxが優先されるようです。

あくまでもfactorで計算された値が基本であるため、max設定値だけ大きくしてもスレッドは増えません
このテストプログラムでは、引数-apf, -apmin, -apmaxによって、この3項目を設定可能です。

スループットの指定

Actorがスレッドを手放すまでに処理するメッセージの数を設定するのが、akkaのthroughputです。パッと見では、用語から挙動を想像するのが難しいような気がします。

AkkaのActorスケジューリングに用いられる単位は、メッセージです。Actorが(receiveで)メッセージを受け取ると、どこかのスレッドに割り当てられ、receiveのブロックから抜けると、Akkaのシステム側に制御を委ねます。以下の例では、Msg1を受信したらdoFoo()を実行して終了、Msg2を受信したらdoBar1()とdoBar2()を実行して終了。
class HogeActor extends Actor {
  override def receive = {
    case Msg1 =>
      doFoo()
    case Msg2 =>
      doBar1()
      doBar2()
  }
}
スレッドを明け渡すまでに、メッセージをいくつ連続で処理するかを示す設定値がthroughputです。メッセージがActorのメールボックスに5個届いていたとして、throughputが5以上あれば、一気に全部処理します。throughputが1であれば、1個処理したところで、Akkaのシステムに一度制御を返します。他にメッセージ処理を待っているActorがいれば、そちらにスレッドを譲ることになります。
スレッド上で走るActorの入れ替えにはコストがかかるので、一気に走り続けた方が性能面では有利です。しかし、あるActorが居座ったままだと、他のActorがいつまで経っても処理を始められないかもしれません。Configuration説明ページのコメントに set to 1 for as fair as possible とあるように、このパラメーターはthroughputとfairnessのトレードオフです。ここまで説明してようやく、パラメーター名の意味がわかります。
throughputパラメーターは、冒頭のテストプログラムでは -at で設定できます。メッセージの数を多めに、スレッドの数をActorの数より少なく設定すると、何がどうなるのか観察できます。

補足

各WorkerActorの始動に1つメッセージを消費しているため、最初の1サイクルはメッセージ1個分ずれます。