Reactive streamã¨ããæ¦å¿µããªãã¨ãªãç¥ã£ã¦ããããå®éã«ã¯ä½¿ã£ããã¨ããªãã£ãã®ã§ããã®ãã¡ä½¿ã£ã¦ã¿ããã¨æã£ã¦ããç¢å ã®è©±ã
ã¿ãã»éè¡ã®æèã§ã¯ã¦ããçºãã¦ãããç¡åæ¢ã·ã¹ãã ã«é¢ããè¨äºãåºã¦ãã¦ããããé¢ç½ãã£ãã
ããã§ãåæ£ã·ã¹ãã ã¨ãã®ãã¨ãèãã¦ãããScalaã®Akkaã®äºãæãåºãããAkka Streamsã£ã¦ãããã¤ã§Reactive Streamããããã®ã§ããã£ã³ã¬ã³ã¸ãã¦ã¿ãããã¨æã£ãã Akka Streamsã¯ãã¢ã¯ã¿ã¼ãã¬ã¼ã ã¯ã¼ã¯ã§ããAkkaãåºç¤æè¡ã«ç¨ãã¦ããã
ã±ã£ã¨æãä»ãæé ãªã¹ããªã¼ã å¦çã¯FizzBuzzãªã®ã§ãAkka Streamã§å®è£ ãã¦ã¿ããã¨æã£ãããã ã®FizzBuzzã§ã¯ã¤ã¾ããªãã®ã§ãAkka Streamsã®æ©è½ãããã¤ãçãè¾¼ããã¨ã«ããã
- ããããReactive Streamã¨ã¯ï¼
- Reactive FizzBuzzã®ä»æ§
- Reactive FizzBuzzã®å®è£
- ããããæ©è½ã®è¿½å : ã¨ã©ã¼å¦ç
- Catså°å ¥
- ã¾ã¨ã
- åèæç®
ããããReactive Streamã¨ã¯ï¼
詳細ã¯âã®ãã¼ã¸ã«è²ãã¨ãã¦ãã¾ãç°¡åã«Reactive Streamãä½ãªã®ãã«ã¤ãã¦èª¬æããã
ä¸ççãªèæ¯ã¨ãã¦ãã¾ãæã ãæ®ããã¦ããä¸çã«ã¯å¸¸ã«å¤§éã®ãã¼ã¿ãããããã©ãããã¾ã§ã¿ããã«å ¨é¨ä¸æ¦æå ã«ç½®ãã¦å¦çã§ãããããªè¦æ¨¡ã§ã¯ãªããªãã¤ã¤ãããä¸æ°ã«ã¡ã¢ãªã«è¼ãã¦å¦çãããã¨ããã°OutOfMemoryãçºçããããããããæå ã®ã¹ãã¬ã¼ã¸ã«ç½®ããã¨ããã¾ã¾ãªããªããã¨ãã£ãè¦æ¨¡ã«ãªã£ã¦ããã
ããã§ããã¼ã¿ãåãåããªããé ã«å¦çãã¦ãå¦çãããã¼ã¿ãã¾ãã©ããã«åºåãã¦ããã°ãããããã¨ããçºæ³ãåºã¦ããããããã¹ããªã¼ã å¦çã
ããããã¹ããªã¼ã å¦çã§ã¯ãä¾ãã°åä¿¡å´ã®å¦çè½åãéä¿¡å´ãããå£ã£ã¦ãã¦ãåä¿¡å´ã®ãããã¡ã溢ãã¦ãã¾ãããã¼ã¿ããã¹ããã¦ãã¾ããã¨ãã£ãåé¡ãçºçãã¦ãã¾ããä»ã«ããAPIã«ã¬ã¼ãå¶éãããã®ã§ä¸å®é度ã§ããå¦çã§ããªãã¨ããåæ£ãããç°å¢ã«åããã¼ã¿ãéä¿¡ãããã¨ãããããã¯ã¼ã¯ããã«è»¢éãããã¨ããæ§ã ãªèª²é¡ãããã
Reactive Streamã¯ããããStreamã®é¢åãªé¨åããã¾ãè¦æ ¼åãããã¨ã§ãç¸äºéç¨ã®è² æ ãæ¸ããã¾ããããã¨ãããã®ãçé ã¨ãªãæ©è½ã¯ããã¯ãã¬ãã·ã£(èå§å¶å¾¡)ã§ãåä¿¡å´ã®å¦çè½åãä½ãã¨ããå¦çã§ããåãããã¼ã¿ãéä¿¡ãããªãããã«ãããã¨ãã£ãå¶å¾¡ãå¯è½ã«ãªããããã«ãããå¹ççãªãã¼ã¿å¦çãå¯è½ã«ãªãã
Reactive Streamã®Scalaå®è£ ãAkka Streams
ã¡ãªã¿ã«Reactive Streamã¯ç¹å®ã®å®è£ ã«ä¾åããªããããReactive Streamã«åã£ã¦å®è£ ããã¦ããã°ãç°ãªãè¨èªéã§ãç¸äºä¹ãå ¥ãå¯è½ã«ãªã£ã¦ããã
ããã¦Reactive Streamsã®Scalaå®è£ ããå ç¨ã触ããAkka Streamsã ãAkka Streamsã¯åºç¤æè¡ã¨ãã¦Akkaã®Actorãæ¡ç¨ãã¦ãããLet-it-crash(ãªã«ããã£ããå£ãã¦åãé¢ããé害ã®æ³¢åãé²ã)ã¨ããã¢ã¯ã¿ã¼çãªææ³ã«åºãã¦ãé å¥ãã¤ã¹ã±ã¼ã©ãã«ã«ã¹ããªã¼ã å¦çãè¡ããã¨ãã§ããã
åç´ã«ä½¿ãæå´ã®ç®ç·ã«ããã¦ããã¨ã©ã¼ãã³ããªã³ã°ããã£ãããã¦ããã®ã§ãããå¦çã¨ãã«ä¾¿å©ã ã¨ãã便å©ã§ããã§ããã©ã¤ãã©ãªã ã¨æãã
Reactive FizzBuzzã®ä»æ§
ãªã«ãã試ãããã¨ããã¾ãFizzBuzzããã®ãä¸ã®ç¿ãããããããã ãã ã¨ã¤ã¾ããªãã®ã§ãAkka Streamsã®åãæ´»ããããã«ãReactiveãªFizzBuzzãèããã
ä»åã¯ã以ä¸ã®ãããªä»æ§ã«åºããFizzBuzzãä½æããã
- FizzBuzzã®çµæãã
got: ***
ã¨ããå½¢å¼ã§æ¯è¡è¡¨ç¤ºãã - 1000件ã®FizzBuzzãå¦çããã
- æåã®æ°åã®å
¥åé度ã«å¶éããã
- 1ç§ããã60åãããã£ã¦ããªã
- ãã¾ã«ãå¦çã«å¤±æãã¦ã¯ã©ãã·ã¥ãã¦ãã¾ã
- ç»é¢ã«åºåããç®æãå£ããã¨ããè¨å®ã§ãããã¨ã«ãã
Reactiveã£ã½ããåºãããã«ããããèããããããã©ãã²ã¨ã¾ããã®2ç¹ãå®è£ ãã¦ã¿ãã
Reactive FizzBuzzã®å®è£
âã®ã½ã¼ã¹ã§sbt run
ãããåãã
å ã«ã½ã¼ã¹ãè²¼ãä»ãã¦ããã以ä¸ã転è¨ã
package example import scala.concurrent._ import akka._ import akka.actor._ import akka.stream._ import akka.util._ /** Reactive 㪠fizzbuzz Akka Streamãç¨ãã¦å®è£ ããã fizzbuzz ã */ object ReactiveFizzBuzz extends App { // Actorãä½ãããã«å¿ è¦ãªå®£è¨ã implicit val system = ActorSystem("NumSys") implicit val materializer = ActorMaterializer() import system.dispatcher // Source ã¨ãã£ã諸ã ã®DSLã使ããããã«ããã import akka.stream.scaladsl._ // GraphDSL ãç¨ãã¦ãéç´ç·ãª Stream ãæ§ç¯ããã val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => // ~> ã¨ãã£ã DSL ã使ããããã«ããã import GraphDSL.Implicits._ // 1ãã1000ã¾ã§åºåãã source ã // throttle æ©è½ã«ããã1ç§ããã60å ¥åããåãä»ããªãããã«è¨å®ãããããé©åãªãã¼ã¹ã§åºåããã¦ããã import scala.concurrent.duration._ import scala.language.postfixOps val src = Source(1 to 1000).throttle(60, 1 second) // å ¥åã3åå²ãããã Int ãåãåãã®ã§ Broadcast[Int] ã¨ãªããåºæ¬çã«åãã©ã¡ã¼ã¿ã¯å ¥åã®åãä¸ããã°ãããåºåã®åã¯èªåçã«æ¨è«ãããã val bcast = builder.add(Broadcast[Int](3)) // Int ãåãåããããã3ã§å²ãããããªãã "fizz" ãããããªãã° "" ãåºåãã flow ã buzzãåæ§ã val fizz = Flow[Int].map { case n if n % 3 == 0 => "fizz" case n => "" } val buzz = Flow[Int].map { case n if n % 5 == 0 => "buzz" case n => "" } // 2ã¤ã® String ãåãåããçµåã㦠String ãåºåãã ZipWith ã Zip ã«å ãã¦ã Tuple2 ã åãåã String ãè¿ã Flow ã®çµã¿åããã§ãå®ç¾ã§ãããã ZipWith ã使ã£ãã»ãã簡便ã // ããã§ã¯åãã©ã¡ã¼ã¿ã¯[å ¥å1, å ¥å2, åºå]ã«ãªã£ã¦ããã val zipJoinString = builder.add(ZipWith[String, String, String]((lhs, rhs) => lhs + rhs)) // 2ã¤ã®å ¥å (lhs, rhs ã¨ãã) ãåãåãã lhs ãé-空æååãªããããããããªãã° rhs ãåºåãã Flow ã // lhs 㯠"fizz" "buzz" "fizzbuzz" "" ã®ããããã®å¤ãã¨ãã // rhs 㯠src ãã渡ã£ã¦ãã Int ãæåååãããã®ãä¸ããããã // ããã§ã¯åãã©ã¡ã¼ã¿ã¯[å ¥å1, å ¥å2, åºå]ã«ãªã£ã¦ããã // Zip ç³»ã³ã³ãã¼ãã³ãã¯2ã¤ã®å ¥åãå¾ æ©ãããããããæãããã«ããã®ã§ãã©ã¡ãããæ¬ ãããã¨ã¯ãªãã val zipTakeFirstIfNotEmpty = builder.add(ZipWith[String, String, String] { case ("", rhs) => rhs case (lhs, _) => lhs }) // Int ãå ¥åã«åããæååã«å¤å½¢ããã ãã® Flow ã // å ¥åãé¢æ°ã«æ¸¡ãããã®ã§ map ã使ã£ã¦ããã val stringify = Flow[Int].map(_.toString()) // æååãå ¥åã«åãããããä¸å®ã®æ¸å¼ã§è¡¨ç¤ºãã Sink ã val sink = Sink.foreach[String] { elem => // æãã¹ããã¨ã«ã sink ã¯ä½é »åº¦ã§ã¯ã©ãã·ã¥ãã¦ãã¾ããä½ãããªããã°ããã§ä¸æãã¦çµäºããããå¾ã§å®è¡æ¦ç¥ãè¨å®ããããç¡è¦ãã¦ç¶è¡ãããã if (Math.random() > 0.99) { system.log.error("sink dead") throw new RuntimeException("Boom!") } println(s"got: $elem") } // ããã§ãåã³ã³ãã¼ãã³ããçµåããã // çµåã«ã¯ ~> ã使ãã ~> ã使ãã¨ãèªåçã« via ã to ã«å¤æãããã // bcast ã¯æ¬æ¥ã¯å ¥åºåã®å¥ã«åããã¦ãããã bcast.in 㨠bcast.out(n) ã®ããã«æ¸ãå¿ è¦ãããããè¨è¿°ããä¸ã§èªæãªå ´åã¯çç¥ã§ããã // ããã«ãå ¥åã¨åºåãé£ç¶ãããè¨è¿°ãå¯è½ã§ããã // zipç³»ã¯ãã©ã®å ¥åå ã«å²ãå½ã¦ããã®ããèªæã«ãªããªãããã zip.in0 ã®ããã«æ示ããå¿ è¦ãããã // zip.out ãæè¨ããå¿ è¦ããããããªããªã®ãã¯ä¸æã // format: off src ~> bcast ~> fizz ~> zipJoinString.in0 bcast ~> buzz ~> zipJoinString.in1 zipJoinString.out ~> zipTakeFirstIfNotEmpty.in0 bcast ~> stringify ~> zipTakeFirstIfNotEmpty.in1 zipTakeFirstIfNotEmpty.out ~> sink // format: on // å®è¡å¯è½ãªå®çµããã°ã©ãã§ãããã¨ã示ãããã« ClosedShape ãè¿ãå¿ è¦ãããã ClosedShape }) // ã¨ã©ã¼çºçæã®å復æ¦ç¥ãããã§è¨å®ããã // throwãããã¨ã©ã¼ãRuntimeExceptionãªãããã®è¦ç´ ã¯æ¨ã¦ãç¡è¦ãã¦ç¶è¡ããã // ãã ãåå²ç¹ã«ããããç®æã§ãããè¡ãã¨ãããããã¯ãèªçºããã®ã§ã使ãç®æã«æ³¨æãå¿ è¦ã§ããã val decider: Supervision.Decider = { case _: RuntimeException => Supervision.Resume case _ => Supervision.Stop } g.withAttributes(ActorAttributes.supervisionStrategy(decider)).run() }
ä¾åæ§å®ç¾©
Akka Streamsã使ãã«ã¯ãbuild.sbt
ã«ä»¥ä¸ã®ãããªä¾åæ§ãå®ç¾©ããã
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % AkkaVersion
ããã§ã¯ãAkkaVersion
ã¯2.6.16
ã使ã£ã¦ããã
Source / Flow / Sink
Akka Streamsã§ã¯ãSourceã¨Flowã¨Sinkã¨ãã3ã¤ã®åºæ¬çãªé¨åã«ãå¿ è¦ã«å¿ãã¦ãããã«ãªãã¬ã¼ã¿ã¨å¼ã°ãã追å çãªè¨å®ãè¡ããããããçµåãã¦Graphã¨ããé¨åãå½¢æããå®è¡ãããã¨ããã®ãåºæ¬çãªæµãã é¨åãçµã¿åããã段éã§ã¯å®éã®å¦çã¯çºçãããGraphã®å®è¡ãæ示ãããã¨ã§åä½ãå§ã¾ãã
SourceãFlowãSinkãªã©ãä½æããå®è¡ããããã«ã¯ã以ä¸ã®å®£è¨ãå¿ è¦ã¨ãªããAkka StreamsãActorããã¼ã¹ã«ãã¦ãããã¨ãå£éè¦ããã¨æãã
// Actorãä½ãããã«å¿ è¦ãªå®£è¨ã implicit val system = ActorSystem("NumSys") implicit val materializer = ActorMaterializer() import system.dispatcher // Source ã¨ãã£ã諸ã ã®DSLã使ããããã«ããã import akka.stream.scaladsl._
ååé¨åã¯Actorãå®éã«ã¡ã¢ãªä¸ã«ä½æããããã«å¿
è¦ãªå¦çã§ãä¾ãã°ActorSystem
ã¯Actoréã®ã¡ãã»ã¼ã¸ã®ç®¡çã«å¿
è¦ã§ããã
å¾åã¯é¨åãå®ç¾©ããDSLãæå¹åãããã®ã§ãSource
ãFlow
ã¨ãã£ãã³ã³ã¹ãã©ã¯ã¿ã使ããããã«ãªãã
Source
Sourceã¯ã¹ããªã¼ã ã®èµ·ç¹ã§ãç¡éã«0ãè¿ããããDBãããã¼ã¿ã1000åãã¤è¿ãã¦ãããããCSVãã1è¡ãã¤èªã¿è¾¼ãã ãã1ããé ã«1000ã¾ã§éãè¾¼ãã§ãããããã
ä»åã¯1ãã1000ã¾ã§è¿ãã¦ãããSourceãå®ç¾©ãããã¨ã«ããã
import scala.concurrent.duration._ import scala.language.postfixOps val src = Source(1 to 1000).throttle(60, 1 second)
ãªãããã®äºæ
ã«ããã1ç§ã«60åããæ°åã¯ãã£ã¦ããªãã¨ããè¨å®ãªã®ã§ãthrottle
ãç¨ãã¦å¶éãããã¦ãããthrottle
ã¯ãã ãããã©ãã«ã§ãããããã¨ãã§ãã¦ãä¾ãã°APIã®ã¬ã¼ããªããã対å¿ã¨ãã£ãç¨éãããã
Sourceã¯åä½ã§ãå®è¡ã§ããã
src.runForeach(println)
1 2 3 ... ... ... 999 1000
Flow
Flowã¯ãå ¥åããã¨ã«ãªã«ãå å·¥ãã¦åºåãã¦ãããã³ã³ãã¼ãã³ããSourceã¨Sinkã®éã§ããã¼ã¿ãå¤æãããããªã«ãããã®éãå¦çããããããã失æãããã¨ãããã
ä»åã¯ããå
¥åãããæ°å¤ã3ã§å²ãåãããªã"fizz"ãåºåããããã§ãªããªã空æååãåºåãããFlowã§ããfizz
ã¨ãfizzåæ§ã«ãµãã¾ãbuzz
ã¨ãInt
ãString
ã«å¤æããstringify
ã¨ãå®è£
ããã
val fizz = Flow[Int].map { case n if n % 3 == 0 => "fizz" case n => "" } val stringify = Flow[Int].map(_.toString())
Flowã¯ãvia
ã使ã£ã¦Sourceã¨æ¥ç¶ã§ããã
src.via(stringify).runForeach(println)
1 2 3 ... ... ... 999 1000
Sink
Sinkã¨ã¯ãã¹ããªã¼ã ã®çµç¹ããã¼ã¿ãã©ããã«åºåãããããããã¯ä½ãããªãã£ãããããããã¦ãé度å¶éããã£ãããæ éãããããã
ä»åã¯ä¸å®ã®æ¸å¼ã§çµæãåºåãã¦ããããã¨ã«ããã
// æååãå ¥åã«åãããããä¸å®ã®æ¸å¼ã§è¡¨ç¤ºãã Sink ã val sink = Sink.foreach[String] { elem => // æãã¹ããã¨ã«ã sink ã¯ä½é »åº¦ã§ã¯ã©ãã·ã¥ãã¦ãã¾ããä½ãããªããã°ããã§ä¸æãã¦çµäºããããå¾ã§å®è¡æ¦ç¥ãè¨å®ããããç¡è¦ãã¦ç¶è¡ãããã if (Math.random() > 0.99) { system.log.error("sink dead") throw new RuntimeException("Boom!") } println(s"got: $elem") }
ããã§ã¯ãSinkã¯1%ã®ç¢ºçã§ããããã¨ãªãäºæ ã«ããã¯ã©ãã·ã¥ãã¦ãã¾ãã
Sinkã¯ãto
ã使ã£ã¦SourceãFlowã¨æ¥ç¶ã§ããã
val graph = src.via(stringify).to(sink)
graph.run()
SourceããSinkã¾ã§ãä¸æ°é貫ãã¦æ¥ç¶ãããGraphã¯ããã®ã¾ã¾å®è¡å¯è½ã«ãªã(RunnableGraph
ã¨å¼ã°ãã)ãããã¯é¨åéã«æªæ¥ç¶ã®ç®æãç¡ãããã
got: 1 got: 2 ... ... ... got: 999 got: 1000
Operator
ã¨ããã§ãsrc
ãå®ç¾©ããã¨ãã«throttle
ã使ã£ãã®ãè¦ãã¦ããã ãããããã®ã³ã¬ã¯ã·ã§ã³ã¤ã³ã¿ã¼ãã§ã¤ã¹é¢¨ã®ã¡ã½ãã㯠ãªãã¬ã¼ã¿ ã¨ç·ç§°ããã¦ãããSource / Flow / Sinkã®æåãä¸é¨å¤æ´ããå½¹å²ãæã¤ã
Akka Streamsã«ã¯å¤ç¨®å¤æ§ãªãªãã¬ã¼ã¿ãå®ç¾©ããã¦ããã
ä¾ãã°filter
ã¯ãã³ã¬ã¯ã·ã§ã³ã¡ã½ããåæ§ã®åããããã
val graph = src.filter(_ % 3 == 0).via(stringify).to(sink) graph.run()
got: 3 got: 6 got: 9 ... got: 996 got: 999
éç·å½¢ã°ã©ã
ããã¾ã§ç´¹ä»ããGraphã¯ãSourceããSinkã¾ã§ããã ä¸ã¤ã®çµè·¯ã§ä¸ç´ç·ã«æ¥ç¶ããã¦ããããããAkka Streamsã®ç¨èªã§ã¯ç·å½¢ãªGraphã¨å¼ã¶ã
å¬ãããã¨ã«ãAkka Streamã¯ã¹ããªã¼ã ã®åå²ã¨åæµããµãã¼ããã¦ãããéä¸ã§æåãããGraphãä½ããã¨ãã§ããã¨ããæå³ã ãããããã°ã©ãã®ãã¨ããAkka Streamsã§ã¯ éç·å½¢ã°ã©ã ã¨å¼ã³ãããããµãã¼ãããããã®ç¹å¥ãªDSLãç¨æããã¦ããã
ä»åä½ãããReactive FizzBuzzãããã®éç·å½¢ã°ã©ãã¨ãã¦å®ç¾©ãã¦ããã
ã¡ãªã¿ã«SourceãSinkã2ã¤ä»¥ä¸å®ç¾©ãããã¨ãã§ãã¦ãåæã«å¥ã®ãã¡ã¤ã«ã«æ¸ãåºããã¨ãã£ãè¸å½ãå¯è½ãªã®ã§è¦ãã¦ããã¨è¯ããããããªãã
FizzBuzzã®å ¨è²ã¯ä»¥ä¸ã®ã°ã©ãã§è¡¨ç¾ãããã
éä¸ã§åå²ã¨åæµãç¹°ãè¿ãã¦ãããããéç·å½¢ã°ã©ãã§ãããã¨ããããã
bcast
ã¯Broadcast
ã¨ããé¨åã§ãå ¥åãåå²ããã¦ååºåã«ä¸æã«åå ±ãããzipJoinString
ã¯ZipWith
ã¨ããé¨åã§ãåå ¥åãåæµããã¦é¢æ°ã«æ¸¡ãã1ã¤ã®åºåãå¾ããããã§ã¯æååçµåã®å½¹å²ã- fizzbuzzã§ãããããªå ´åã«ãfizzã¨buzzã¨ããã£ã¤ããå½¹å²ã
zipTakeFirstIfNotEmpty
ãZipWith
ã ããã1ã¤ãã®å ¥åã空æååã§ãªããã°ããããã®ã¾ã¾æ¡ç¨ãã空æååãªãã°2ã¤ãã®å ¥åãæ¡ç¨ãã¦åºåãããã¨ããå½¹å²ã«ãã¦ããã- fizzã§ãbuzzã§ããªãã£ãå ´åã¯æ°åããã®ã¾ã¾åºåããããã¨ããå½¹å²ã
ãããå®è£ ã«èµ·ããã¨åé ã§æ示ããã½ã¼ã¹ã³ã¼ãã®ããã«ãªãã
è¦ç¹ã ãããã¤ã¾ãã§ç´¹ä»ããã¨ã
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => // ~> ã¨ãã£ã DSL ã使ããããã«ããã import GraphDSL.Implicits._ // ããã§ããããã³ã³ãã¼ãã³ããä½æãã // ããã§ãåã³ã³ãã¼ãã³ããçµåããã // çµåã«ã¯ ~> ã使ãã ~> ã使ãã¨ãèªåçã« via ã to ã«å¤æãããã // format: off src ~> ............. ~> sink // format: on // å®è¡å¯è½ãªå®çµããã°ã©ãã§ãããã¨ã示ãããã« ClosedShape ãè¿ãå¿ è¦ãããã ClosedShape })
以ä¸ã®ããã«ãªããGraphDSL.create()
ãBuilder
ã渡ãã¦ãããã®ã§ãããã使ã£ã¦mutableã«ã°ã©ããä½æãã¦ãããã¨ããæµãã«ãªã£ã¦ãããããã ãã§ã¯Graphãå®è¡å¯è½ãã¯èæ
®ãããªãã®ã§ãRunnableGraph.fromGraph
ãç¨ãã¦æ示çã«å®è¡å¯è½ãªç¶æ
ã«ãã¦ããããã¨ã§ãg
ã¯å®è¡å¯è½ã«ãªãã
g.run()
got: 1 got: 2 got: fizz got: 4 got: buzz got: fizz got: 7 got: 8 got: fizz got: buzz got: 11 got: fizz got: 13 got: 14 got: fizzbuzz ...
ãã£ãï¼FizzBuzzãåããï¼
å°ã¾ã¨ã
- GraphDSLãç¨ãã¦éç·å½¢ã°ã©ããæ§ç¯ã§ããã
- Sourceããå ¥åãããæ°åãFlowãééããåå²ã¨åæµãè¡ãã¤ã¤FizzBuzzã®çµæã«å¤æãããç»é¢ã«ãã©ã¼ãããããç¶æ ã§åºåãããã¨ãã§ããã
ããããæ©è½ã®è¿½å : ã¨ã©ã¼å¦ç
FizzBuzzã®å®è£ ãã§ããããã©ãããã ãã§ã¯Akka Streamsã®è½åããã¾ã使ãã¦ããªãã
ããã§ã¯ã¨ã©ã¼å¦çãã°ã©ãã«çµè¾¼ããã¨ã«ãããã
åè¿°ããéããFizzBuzzã®çµæããã©ã¼ããããã¦åºåããsink
ã¯ãä½ç¢ºçã§ã¨ã©ã¼ãèµ·ããã¦ãã¾ããã¨ã©ã¼ãèµ·ãã£ã¦ãå¦çã¯ç¶ç¶ãã¦ã»ããã®ã§ããã¨ã©ã¼ãèµ·ãã£ãè¦ç´ ã¯æ¨ã¦ã¦ãã ãããã¨ããæ¦ç¥ã§å®è¡ãã¦ãããã
ã°ã©ãã§ä¾å¤ãçºçããã¨ãã«ã¨ãããæ¦ç¥ã®ãã¨ããAkka Streamsã§ã¯Supervision Strategyã¨å¼ã¶ããããã°ã©ãã«è¨å®ãããã¨ã§ãä¾å¤éåºæã®æåãè¨å®ãããã¨ãã§ããã
// ã¨ã©ã¼çºçæã®å復æ¦ç¥ãããã§è¨å®ããã // throwãããã¨ã©ã¼ãRuntimeExceptionãªãããã®è¦ç´ ã¯æ¨ã¦ãç¡è¦ãã¦ç¶è¡ããã // ãã ãåå²ç¹ã«ããããç®æã§ãããè¡ãã¨ãããããã¯ãèªçºããã®ã§ã使ãç®æã«æ³¨æãå¿ è¦ã§ããã val decider: Supervision.Decider = { case _: RuntimeException => Supervision.Resume case _ => Supervision.Stop } g.withAttributes(ActorAttributes.supervisionStrategy(decider)).run()
ä¾å¤ãçºçããã¨ãã¾ãdecider
ã«ä¾å¤æ
å ±ãä¼ãããããdecider
ã¯ä¾å¤ã®åã«åºãã¦ãå®è¡ãç¶ç¶ããã®ããã¨ã©ã¼ãèµ·ããã¦åæ¢ããã®ããã¨ãã£ãå¤æãè¡ããä»åã¯RuntimeException
ã«å¯¾ãã¦ã¯ç¶ç¶å®è¡ããæ¦ç¥ããããã¦ãã以å¤ã®ä¾å¤ã«å¯¾ãã¦ã¯å®è¡ãåæ¢ããã¨ããæ¦ç¥ãä¸ããã
å®éã«ä¾å¤ãçºçããã¨ä»¥ä¸ã®ãããªæåã«ãªãã
..... got: 22 got: 23 got: fizz got: 26 [ERROR] [10/14/2021 16:42:29.709] [NumSys-akka.actor.default-dispatcher-5] [akka.actor.ActorSystemImpl(NumSys)] sink dead got: fizz got: 28 got: 29 .....
24ã®æç¹ã§ã¨ã©ã¼ãçºçãã¦å ±åããã¦ãã(éåæã«ãã°ã表示ããã¦ããã®ã§ã表示ä¸ã®ä½ç½®ã¨ã¯å¿
ãããä¸è´ããªã)ãããã¦24ã®çµæã¯ç ´æ£ããããã®ã¾ã¾25(fizz)ã26ã¸ã¨é²ãã§ãããã¨ããããããããSupervision.Resume
æ¦ç¥ã§ããã
ããã¾ã§ããã¯ä¸ä¾ã§ãä»ã«ãAkka Streamã¯æ§ã ãªã¨ã©ã¼å¯¾çãæ½ããã¨ãã§ããã
ãµã¼ããããã¬ã¼ã«ã¼ãªã©ãããã®ã§èå³ãããã°èªãã§ã»ããã
Catså°å ¥
ä¸å¿ããã§FizzBuzzã¯å®æããã®ã ããã©ãã¡ãã£ã¨åé·ãªç®æãCatsã§æç´ãããã
空æååã®ä»£ããã«None
ãè¿ãããã«ãã
ã¾ããéç·å½¢ã°ã©ãã以ä¸ã®ããã«ä¿®æ£ããã
fizz
/buzz
ã¯ä»ã®ã¨ãã"fizz(buzz)"
ã""
ããè¿ãã¦ããããããSome("fizz")
/None
ãè¿ãããã«ä¿®æ£ããã
// Int ãåãåããããã3ã§å²ãããããªãã Some("fizz") ãããããªãã° None ãåºåãã flow ã buzzãåæ§ã // ãã£ããå¤æ°ã«PartialFunctionãåãã type -->[A, B] = PartialFunction[A, B] val fizzpf: Int --> String = { case n if n % 3 == 0 => "fizz" } val buzzpf: Int --> String = { case n if n % 5 == 0 => "buzz" } // liftã㦠Int => Option[String] ã«å¤æãã val List(fizz, buzz) = List(fizzpf.lift, buzzpf.lift).map(Flow[Int].map)
PartialFunction
ã¨lift
ã¨ãçµã¿åãããã°ãããããè¨è¿°ãç°¡æ½ã«ãªã£ãã
stringify
ãåæ§ã«Some(string)
ãè¿ãããã«ãããcatsã®.some
ã使ãã
val stringify = Flow[Int].map(_.toString().some)
ããã§ãfizz/buzz/æ°åããã®ã¾ã¾è¿ãFlowã¯ãOption[String]
ãè¿ãããã«ãªã£ãã
ããããæååçµåãã
fizz
ã¨buzz
ã¨ã¯ãããããzipJoinString
ã«æ¥ç¶ãã¦ãããããã¾ã§ã¯æååã渡ã£ã¦ãã¦ããã®ã§ãã®ã¾ã¾çµåããã°ããã£ãããä»åã¯Option[String]
ãå
¥åãããã
ãããã£ã¦ã次ã®ãããªFlowãä½ãããã
- 2ã¤ã®
Option[String]
ãçµåããOption[String]
ãè¿ããã - ã©ã¡ããã
None
ã§ããã°ãNone
ã§ã¯ãªãã»ããè¿ããã - 両æ¹
None
ã§ããã°ãNone
ãè¿ããã
ãã®æåã¯Monoidã®combineã®æåã¨åãã§ãããããMonoidã¨ãã¦æ±ãã Semigroupã§è¯ãã£ãã
import cats.Semigroup import cats.implicits._ val zipJoinString = builder.add(ZipWith(Semigroup[Option[String]].combine(_, _)))
ãããããããããäºé
æ¼ç®åã®è¡¨è¨ã使ã£ã¦ããããSemigroupã®combineã¯|+|
ã§è¡¨ç¾ãããã
import cats.Semigroup import cats.implicits._ val zipJoinString = builder.add(ZipWith((lhs: Option[String], rhs: Option[String]) => lhs |+| rhs))
ããã§ãSome
ãªæååå士ãçµåããFlowãä½ããã¨ãã§ããã
1ã¤ãã®å ¥åãSomeã§ããå ´åã«éã£ã¦1ã¤ãã®å ¥åãè¿ã
å
ç¨ä½æããzipJoinString
ã¯ãSome("fizz")
ãSome("buzz")
ãSome("fizzbuzz")
ãNone
ãè¿ããã¨ãããã£ã¦ããã
ããã¦ãstringify
ã¯ãSome("æ°å")
ã常ã«è¿ããã¨ãããã£ã¦ããã
fizzbuzzã®ãã¸ãã¯ãæºããã«ã¯ãzipJoinString
ãNone
ãè¿ãã¦ããå ´åã«éã£ã¦ãstringify
ã®çµæãæ¡ç¨ããã°è¯ãã
æ´çããã¨æ¬¡ã®ããã«ãªãã
zipJoinString
ãSome
â ãããåºåããzipJoinString
ãNone
âstringify
ã®çµæãåºåãã
ãã®æåã¯SemigroupKã®combineKã®æåã¨åãã§ãããããSemigroupKã¨ãã¦æ±ãã
ãã¡ããäºé
æ¼ç®åã®è¡¨è¨ã使ã£ã¦å®è£
ãããSemigroupKã®combineKã¯<+>
ã§è¡¨ç¾ãããã
import cats.SemigroupK val zipTakeFirstIfNotEmpty = builder.add( ZipWith((lhs: Option[String], rhs: Option[String]) => (lhs <+> rhs).get) )
次段ã¯Sinkã§ããããªããã¤stringify
ã¯å¿
ãSome
ãè¿ããã¨ãåãã£ã¦ããã®ã§ãget
ã使ã£ã¦String
ãåãåºãã¦ããã
ããã§ãCatsã®æ©è½ãã¡ãã£ã¨åãå ¥ããFizzBuzzã®å®æã
ã¾ã¨ã
- Akka Streamsã使ã£ã¦fizzbuzzãå®è£ ãããã¨ãã§ããã
- Akka Streamsã®æ©è½ã§ãããã¹ããããªã³ã°ãã¨ã©ã¼å¦çãå®è£ ã§ããã
- Catsã®æ©è½ãç¨ãã¦ãé¢æ°åããã°ã©ãã³ã°ãè¡ããç°¡æ½ãªè¨è¿°ãã§ããã
åèæç®
å ¬å¼ãã¼ã¸ã
ä»å使ã£ãåã¯ã©ã¹ã®èª¬æã
ããå®è·µçã«ãDBãCSVãæ±ãè¨äºã
åºç¤ã®èª¬æã
ãã大ããªã¬ãã«ã§ãå¥ã®ã·ã¹ãã ã¨Akka Streamã¨ãç°¡åã«æ¥ç¶ã§ããããã«ããAlpakkaã®èª¬æã
æé«ã®æ¬ã ãã©ã¡ãã£ã¨ãã¼ã¸ã§ã³ãå¤ããæ°å¹´åã®æ¬ã