Lambdaカクテル

京都在住Webエンジニアの日記です

Invite link for Scalaわいわいランド

Akka StreamsでReactive FizzBuzzした 〜Akka Streams入門〜

f:id:Windymelt:20211014162046p:plain

Reactive streamという概念をなんとなく知っていたが、実際には使ったことがなかったので、そのうち使ってみたいと思っていた矢先の話。

みずほ銀行の文脈ではてブを眺めてたら、無停止システムに関する記事が出てきて、これが面白かった。

koduki.hatenablog.com

それで、分散システムとかのことを考えていたらScalaのAkkaの事を思い出した。Akka StreamsっていうやつでReactive Streamをやれるので、チャンレンジしてみるか、と思った。 Akka Streamsは、アクターフレームワークであるAkkaを基盤技術に用いている。

ぱっと思い付く手頃なストリーム処理はFizzBuzzなので、Akka Streamで実装してみようと思った。ただのFizzBuzzではつまらないので、Akka Streamsの機能をいくつか盛り込むことにした。

そもそもReactive Streamとは?

takezoe.hatenablog.com

詳細は↑のページに譲るとして、まず簡単にReactive Streamが何なのかについて説明する。

世界的な背景として、まず我々が暮らしている世界には常に大量のデータがあるけれど、これまでみたいに全部一旦手元に置いて処理できるような規模ではなくなりつつある。一気にメモリに載せて処理しようとすればOutOfMemoryが発生したり、そもそも手元のストレージに置くことすらままならない、といった規模になっている。

そこで、データを受け取りながら順に処理して、処理したデータをまたどこかに出力していけばよさそう、という発想が出てきた。これがストリーム処理。

しかし、ストリーム処理では、例えば受信側の処理能力が送信側よりも劣っていて、受信側のバッファが溢れてしまい、データをロストしてしまう、といった問題が発生してしまう。他にも、APIにレート制限があるので一定速度でしか処理できないとか、分散させた環境に同じデータを送信したいとか、ネットワークごしに転送したいとか、様々な課題がある。

Reactive StreamはそうしたStreamの面倒な部分をうまく規格化することで、相互運用の負担を減らしましょう、というもの。筆頭となる機能はバックプレッシャ(背圧制御)で、受信側の処理能力が低いとき、処理できる分しかデータを送信させないようにする、といった制御が可能になる。これにより、効率的なデータ処理が可能になる。

Reactive StreamのScala実装がAkka Streams

ちなみにReactive Streamは特定の実装に依存しないため、Reactive Streamに則って実装されていれば、異なる言語間でも相互乗り入れ可能になっている。

そしてReactive StreamsのScala実装が、先程も触れたAkka Streamsだ。Akka Streamsは基盤技術としてAkkaのActorを採用しており、Let-it-crash(なにかあったら壊して切り離し、障害の波及を防ぐ)というアクター的な思想に基いて、頑健かつスケーラブルにストリーム処理を行うことができる。

単純に使い手側の目線においても、エラーハンドリングがしっかりしているのでバッチ処理とかに便利だとか、便利でよくできたライブラリだと思う。

Reactive FizzBuzzの仕様

なにかを試したいとき、まずFizzBuzzするのが世の習い。しかしそれだけだとつまらないので、Akka Streamsの力を活かすために、ReactiveなFizzBuzzを考える。

今回は、以下のような仕様に基いたFizzBuzzを作成する。

  • FizzBuzzの結果を、got: ***という形式で毎行表示する
  • 1000件のFizzBuzzを処理させる
  • 最初の数字の入力速度に制限がある
    • 1秒あたり60個しかやってこない
  • たまに、処理に失敗してクラッシュしてしまう
    • 画面に出力する箇所が壊れるという設定でいくことにした

Reactiveっぽさを出すためにいろいろ考えられるけれど、ひとまずこの2点を実装してみた。

Reactive FizzBuzzの実装

↓のソースでsbt runしたら動くよ

github.com

先にソースを貼り付けておく。以下、転記。

package example

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.util._

/** Reactive な fizzbuzz Akka Streamを用いて実装された fizzbuzz 。
  */
object ReactiveFizzBuzz extends App {
  // Actorを作るために必要な宣言。
  implicit val system = ActorSystem("NumSys")
  implicit val materializer = ActorMaterializer()
  import system.dispatcher

  // Source といった諸々のDSLを使えるようにする。
  import akka.stream.scaladsl._

  // GraphDSL を用いて、非直線な Stream を構築する。
  val g = RunnableGraph.fromGraph(GraphDSL.create() {
    implicit builder: GraphDSL.Builder[NotUsed] =>
      // ~> といった DSL を使えるようにする。
      import GraphDSL.Implicits._

      // 1から1000まで出力する source 。
      // throttle 機能により、1秒あたり60入力しか受け付けないように設定したため、適切なペースで出力されていく。
      import scala.concurrent.duration._
      import scala.language.postfixOps
      val src = Source(1 to 1000).throttle(60, 1 second)

      // 入力を3分岐させる。 Int を受け取るので Broadcast[Int] となる。基本的に型パラメータは入力の型を与えればよく、出力の型は自動的に推論される。
      val bcast = builder.add(Broadcast[Int](3))

      // Int を受け取り、それが3で割りきれるなら、 "fizz" を、さもなくば "" を出力する flow 。 buzzも同様。
      val fizz = Flow[Int].map {
        case n if n % 3 == 0 => "fizz"
        case n               => ""
      }
      val buzz = Flow[Int].map {
        case n if n % 5 == 0 => "buzz"
        case n               => ""
      }

      // 2つの String を受け取り、結合して String を出力する ZipWith 。 Zip に加えて、 Tuple2 を 受け取り String を返す Flow の組み合わせでも実現できるが、 ZipWith を使ったほうが簡便。
      // ここでは型パラメータは[入力1, 入力2, 出力]になっている。
      val zipJoinString =
        builder.add(ZipWith[String, String, String]((lhs, rhs) => lhs + rhs))

      // 2つの入力 (lhs, rhs とする) を受け取り、 lhs が非-空文字列ならそれを、さもなくば rhs を出力する Flow 。
      // lhs は "fizz" "buzz" "fizzbuzz" "" のいずれかの値をとる。
      // rhs は src から渡ってくる Int を文字列化したものが与えられる。
      // ここでは型パラメータは[入力1, 入力2, 出力]になっている。
      // Zip 系コンポーネントは2つの入力を待機し、それぞれが揃うようにするので、どちらかが欠けることはない。
      val zipTakeFirstIfNotEmpty = builder.add(ZipWith[String, String, String] {
        case ("", rhs) => rhs
        case (lhs, _)  => lhs
      })

      // Int を入力に取り、文字列に変形するだけの Flow 。
      // 入力を関数に渡したいので map を使っている。
      val stringify = Flow[Int].map(_.toString())

      // 文字列を入力に取り、それを一定の書式で表示する Sink 。
      val sink = Sink.foreach[String] { elem =>
        // 恐るべきことに、 sink は低頻度でクラッシュしてしまう。何もしなければそこで中断して終了するが、後で実行戦略を設定するため無視して続行される。
        if (Math.random() > 0.99) {
          system.log.error("sink dead")
          throw new RuntimeException("Boom!")
        }
        println(s"got: $elem")
      }

      // ここで、各コンポーネントを結合する。
      // 結合には ~> を使う。 ~> を使うと、自動的に via や to に変換される。
      // bcast は本来は入出力の別に合わせてそれぞれ bcast.in と bcast.out(n) のように書く必要があるが、記述する上で自明な場合は省略できる。
      // さらに、入力と出力を連続させた記述も可能である。
      // zip系は、どの入力先に割り当てたいのかが自明にならないため、 zip.in0 のように明示する必要がある。
      // zip.out も明記する必要があるが、なぜなのかは不明。
      // format: off
      src ~> bcast ~> fizz ~> zipJoinString.in0
             bcast ~> buzz ~> zipJoinString.in1
                              zipJoinString.out ~> zipTakeFirstIfNotEmpty.in0
             bcast ~> stringify                 ~> zipTakeFirstIfNotEmpty.in1
                                                   zipTakeFirstIfNotEmpty.out ~> sink
      // format: on

      // 実行可能な完結したグラフであることを示すために ClosedShape を返す必要がある。
      ClosedShape
  })

  // エラー発生時の回復戦略をここで設定する。
  // throwされたエラーがRuntimeExceptionなら、その要素は捨て、無視して続行する。
  // ただし分岐点にかかわる箇所でこれを行うとデッドロックを誘発するので、使う箇所に注意が必要である。
  val decider: Supervision.Decider = {
    case _: RuntimeException => Supervision.Resume
    case _                   => Supervision.Stop
  }

  g.withAttributes(ActorAttributes.supervisionStrategy(decider)).run()
}

依存性定義

Akka Streamsを使うには、build.sbtに以下のような依存性を定義する。

libraryDependencies += "com.typesafe.akka" %% "akka-stream" % AkkaVersion

ここでは、AkkaVersionは2.6.16を使っている。

Source / Flow / Sink

Akka Streamsでは、SourceとFlowとSinkという3つの基本的な部品に、必要に応じてこれらにオペレータと呼ばれる追加的な設定を行い、これらを結合してGraphという部品を形成し、実行する、というのが基本的な流れ。 部品を組み合わせる段階では実際の処理は発生せず、Graphの実行を明示することで動作が始まる。

Source、Flow、Sinkなどを作成し、実行するためには、以下の宣言が必要となる。Akka StreamsがActorをベースにしていることが垣間見えると思う。

// Actorを作るために必要な宣言。
implicit val system = ActorSystem("NumSys")
implicit val materializer = ActorMaterializer()
import system.dispatcher

// Source といった諸々のDSLを使えるようにする。
import akka.stream.scaladsl._

前半部分はActorを実際にメモリ上に作成するために必要な処理で、例えばActorSystemはActor間のメッセージの管理に必要である。

後半は部品を定義するDSLを有効化するもので、SourceやFlowといったコンストラクタが使えるようになる。

Source

Sourceはストリームの起点で、無限に0を返したり、DBからデータを1000個ずつ返してくれたり、CSVから1行ずつ読み込んだり、1から順に1000まで送り込んできたりする。

今回は1から1000まで返してもらうSourceを定義することにした。

import scala.concurrent.duration._
import scala.language.postfixOps
val src = Source(1 to 1000).throttle(60, 1 second)

なんらかの事情により、1秒に60個しか数字はやってこないという設定なので、throttleを用いて制限をかけている。throttleは、だいたいどこにでもかけることができて、例えばAPIのレートリミット対応といった用途がある。

Sourceは単体でも実行できる。

src.runForeach(println)
1
2
3
...
...
...
999
1000

Flow

Flowは、入力をもとになにか加工して出力してくれるコンポーネント。SourceとSinkの間で、データを変換したり、なにがしかの重い処理をしたりする。失敗することもある。

f:id:Windymelt:20211014154555p:plain
図

今回は、「入力された数値が3で割り切れるなら"fizz"を出力し、そうでないなら空文字列を出力する」Flowであるfizzと、fizz同様にふるまうbuzzと、IntをStringに変換するstringifyとを実装した。

      val fizz = Flow[Int].map {
        case n if n % 3 == 0 => "fizz"
        case n               => ""
      }
      val stringify = Flow[Int].map(_.toString())

Flowは、via を使ってSourceと接続できる。

src.via(stringify).runForeach(println)
1
2
3
...
...
...
999
1000

Sink

Sinkとは、ストリームの終点。データをどこかに出力したり、もしくは何もしなかったりする。たいてい速度制限があったり、故障したりする。

f:id:Windymelt:20211014154753p:plain
図

今回は一定の書式で結果を出力してもらうことにした。

      // 文字列を入力に取り、それを一定の書式で表示する Sink 。
      val sink = Sink.foreach[String] { elem =>
        // 恐るべきことに、 sink は低頻度でクラッシュしてしまう。何もしなければそこで中断して終了するが、後で実行戦略を設定するため無視して続行される。
        if (Math.random() > 0.99) {
          system.log.error("sink dead")
          throw new RuntimeException("Boom!")
        }
        println(s"got: $elem")
      }

ここでは、Sinkは1%の確率で、やんごとなき事情によりクラッシュしてしまう。

Sinkは、toを使ってSourceやFlowと接続できる。

val graph = src.via(stringify).to(sink)
graph.run()

SourceからSinkまでが一気通貫して接続されたGraphは、そのまま実行可能になる(RunnableGraphと呼ばれる)。これは部品間に未接続の箇所が無いため。

got: 1
got: 2
...
...
...
got: 999
got: 1000

Operator

ところで、srcを定義したときにthrottleを使ったのを覚えているだろうか。このコレクションインターフェイス風のメソッドは オペレータ と総称されており、Source / Flow / Sinkの挙動を一部変更する役割を持つ。

Akka Streamsには多種多様なオペレータが定義されている。

doc.akka.io

例えばfilterは、コレクションメソッド同様の働きをする。

val graph = src.filter(_ % 3 == 0).via(stringify).to(sink)
graph.run()
got: 3
got: 6
got: 9
...
got: 996
got: 999

非線形グラフ

これまで紹介したGraphは、SourceからSinkまでがただ一つの経路で一直線に接続されていた。これをAkka Streamsの用語では線形なGraphと呼ぶ。

嬉しいことに、Akka Streamはストリームの分岐と合流をサポートしている。途中で枝分れしたGraphを作ることができるという意味だ。こうしたグラフのことを、Akka Streamsでは 非線形グラフ と呼び、これをサポートするための特別なDSLが用意されている。

f:id:Windymelt:20211014160852p:plain
"非線形"グラフ

今回作りたいReactive FizzBuzzも、この非線形グラフとして定義していく。

ちなみにSourceもSinkも2つ以上定義することができて、同時に別のファイルに書き出す、といった芸当も可能なので覚えておくと良いかもしれない。

FizzBuzzの全貌は以下のグラフで表現される。

f:id:Windymelt:20211014162046p:plain
FizzBuzz

途中で分岐と合流を繰り返しているため、非線形グラフであることがわかる。

  • bcastはBroadcastという部品で、入力を分岐させて各出力に一斉に同報する。
  • zipJoinStringはZipWithという部品で、各入力を合流させて関数に渡し、1つの出力を得る。ここでは文字列結合の役割。
    • fizzbuzzであるような場合に、fizzとbuzzとをくっつける役割。
  • zipTakeFirstIfNotEmptyã‚‚ZipWithだが、「1つめの入力が空文字列でなければそれをそのまま採用し、空文字列ならば2つめの入力を採用して出力する」という役割にしている。
    • fizzでもbuzzでもなかった場合は数字をそのまま出力させる、という役割。

これを実装に起こすと冒頭で提示したソースコードのようになる。

要点だけかいつまんで紹介すると、

  val g = RunnableGraph.fromGraph(GraphDSL.create() {
    implicit builder: GraphDSL.Builder[NotUsed] =>
      // ~> といった DSL を使えるようにする。
      import GraphDSL.Implicits._

      // ここでいろいろコンポーネントを作成する

      // ここで、各コンポーネントを結合する。
      // 結合には ~> を使う。 ~> を使うと、自動的に via や to に変換される。
      // format: off
      src ~> ............. ~> sink
      // format: on

      // 実行可能な完結したグラフであることを示すために ClosedShape を返す必要がある。
      ClosedShape
  })

以上のようになる。GraphDSL.create()がBuilderを渡してくれるので、これを使ってmutableにグラフを作成していく、という流れになっている。これだけではGraphが実行可能かは考慮されないので、RunnableGraph.fromGraphを用いて明示的に実行可能な状態にしてあげることで、gは実行可能になる。

g.run()
got: 1
got: 2
got: fizz
got: 4
got: buzz
got: fizz
got: 7
got: 8
got: fizz
got: buzz
got: 11
got: fizz
got: 13
got: 14
got: fizzbuzz
...

やった!FizzBuzzが動いた!

小まとめ

  • GraphDSLを用いて非線形グラフを構築できた。
  • Sourceから入力された数列がFlowを通過し、分岐と合流を行いつつFizzBuzzの結果に変換され、画面にフォーマットした状態で出力することができた。

おもしろ機能の追加: エラー処理

FizzBuzzの実装ができたけれど、これだけではAkka Streamsの能力をあまり使えていない。

ここではエラー処理をグラフに組込むことにしよう。

前述した通り、FizzBuzzの結果をフォーマットして出力するsinkは、低確率でエラーを起こしてしまう。エラーが起こっても処理は継続してほしいので、「エラーが起こった要素は捨ててください」という戦略で実行してもらう。

グラフで例外が発生したときにとりうる戦略のことを、Akka StreamsではSupervision Strategyと呼ぶ。これをグラフに設定することで、例外送出時の挙動を設定することができる。

  // エラー発生時の回復戦略をここで設定する。
  // throwされたエラーがRuntimeExceptionなら、その要素は捨て、無視して続行する。
  // ただし分岐点にかかわる箇所でこれを行うとデッドロックを誘発するので、使う箇所に注意が必要である。
  val decider: Supervision.Decider = {
    case _: RuntimeException => Supervision.Resume
    case _                   => Supervision.Stop
  }

  g.withAttributes(ActorAttributes.supervisionStrategy(decider)).run()

例外が発生すると、まずdeciderに例外情報が伝えられる。deciderは例外の型に基いて、実行を継続するのか、エラーを起こして停止するのか、といった判断を行う。今回はRuntimeExceptionに対しては継続実行する戦略を、そしてそれ以外の例外に対しては実行を停止するという戦略を与えた。

実際に例外が発生すると以下のような挙動になる。

.....
got: 22
got: 23
got: fizz
got: 26
[ERROR] [10/14/2021 16:42:29.709] [NumSys-akka.actor.default-dispatcher-5] [akka.actor.ActorSystemImpl(NumSys)] sink dead
got: fizz
got: 28
got: 29
.....

24の時点でエラーが発生して報告されている(非同期にログが表示されているので、表示上の位置とは必ずしも一致しない)。そして24の結果は破棄され、そのまま25(fizz)、26へと進んでいることがわかる。これがSupervision.Resume戦略である。

あくまでこれは一例で、他にもAkka Streamは様々なエラー対策を施すことができる。

doc.akka.io

サーキットブレーカーなどもあるので興味があれば読んでほしい。

Catså°Žå…¥

一応これでFizzBuzzは完成したのだけれど、ちょっと冗長な箇所をCatsで手直しする。

空文字列の代わりにNoneを返すようにする

まず、非線形グラフを以下のように修正する。

f:id:Windymelt:20211014164910p:plain
StringのかわりにOption[String]を使う

fizz/buzzは今のところ"fizz(buzz)"か""かを返している。これをSome("fizz")/Noneを返すように修正する。

// Int を受け取り、それが3で割りきれるなら、 Some("fizz") を、さもなくば None を出力する flow 。 buzzも同様。
// いったん変数にPartialFunctionを受ける
type -->[A, B] = PartialFunction[A, B]
val fizzpf: Int --> String = { case n if n % 3 == 0 => "fizz" }
val buzzpf: Int --> String = { case n if n % 5 == 0 => "buzz" }
// liftして Int => Option[String] に変換する
val List(fizz, buzz) = List(fizzpf.lift, buzzpf.lift).map(Flow[Int].map)

PartialFunctionとliftとを組み合わせればたやすい。記述が簡潔になった。

stringifyも同様にSome(string)を返すようにする。catsの.someを使う。

val stringify = Flow[Int].map(_.toString().some)

これで、fizz/buzz/数字をそのまま返すFlowは、Option[String]を返すようになった。

かしこく文字列結合する

fizzとbuzzとは、それぞれzipJoinString に接続している。これまでは文字列が渡ってきていたのでそのまま結合すればよかったが、今回はOption[String]が入力される。

したがって、次のようなFlowを作りたい。

  • 2つのOption[String]を結合し、Option[String]を返したい
  • どちらかがNoneであれば、Noneではないほうを返したい
  • 両方Noneであれば、Noneを返したい

この挙動はMonoidのcombineの挙動と同じであるから、Monoidとして扱う。 Semigroupで良かった。

import cats.Semigroup
import cats.implicits._
val zipJoinString = builder.add(ZipWith(Semigroup[Option[String]].combine(_, _)))

よりわかりやすく、二項演算子の表記を使ってもよい。Semigroupのcombineは|+|で表現される。

import cats.Semigroup
import cats.implicits._
val zipJoinString = builder.add(ZipWith((lhs: Option[String], rhs: Option[String]) => lhs |+| rhs))

これで、Someな文字列同士を結合するFlowを作ることができた。

1つめの入力がSomeである場合に限って1つめの入力を返す

先程作成したzipJoinStringは、Some("fizz")かSome("buzz")かSome("fizzbuzz")かNoneを返すことがわかっている。

そして、stringifyは、Some("数字")を常に返すことがわかっている。

fizzbuzzのロジックを満たすには、zipJoinStringがNoneを返している場合に限って、stringifyの結果を採用すれば良い。

整理すると次のようになる。

  • zipJoinStringがSome → それを出力する
  • zipJoinStringがNone → stringify の結果を出力する

この挙動はSemigroupKのcombineKの挙動と同じであるから、SemigroupKとして扱う。

こちらも二項演算子の表記を使って実装した。SemigroupKのcombineKは<+>で表現される。

import cats.SemigroupK
val zipTakeFirstIfNotEmpty = builder.add(
  ZipWith((lhs: Option[String], rhs: Option[String]) => (lhs <+> rhs).get)
)

次段はSinkであり、なおかつstringifyは必ずSomeを返すことが分かっているので、getを使ってStringを取り出している。

これで、Catsの機能をちょっと取り入れたFizzBuzzの完成。

まとめ

  • Akka Streamsを使ってfizzbuzzを実装することができた。
  • Akka Streamsの機能である、スロットリング、エラー処理を実装できた。
  • Catsの機能を用いて、関数型プログラミングを行い、簡潔な記述ができた。

参考文献

doc.akka.io

公式ページ。

typelevel.org

typelevel.org

今回使った型クラスの説明。

dev.classmethod.jp

より実践的に、DBやCSVを扱う記事。

qiita.com

基礎の説明。

open8tech.hatenablog.com

より大きなレベルで、別のシステムとAkka Streamとを簡単に接続できるようにするAlpakkaの説明。

最高の本だけどちょっとバージョンが古い。数年前の本。

★記事をRTしてもらえると喜びます
Webアプリケーション開発関連の記事を投稿しています.読者になってみませんか?