Akka Streamãããæãã«åæ¢ããã
Akka Streamã使ã£ããµã¼ãã¼ãæ¸ãã¦ãã¦ããµã¼ãã¼ã®çµäºå¦çãè¡ãæã«streamãåæ¢ãããå¿ è¦ãåºã¦ãã¾ãããã©ããããè¯ãã§ããããã
ãããªã ActorSystem#terminate
ã¨ããããAkka Streamèªä½ã®çµäºã®ããæ¹ãããã¯ç¡çãã ActorSystem#terminate
ãå¼ãã§ãæ¢ã¾ããã¨ã¯æ¢ã¾ãã¾ãã
implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1)) val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _) val result = src.runWith(sink) Thread.sleep(1000) system.terminate() logger.info("result:" + Await.result(result, Duration.Inf)))
ã§ããã以ä¸ã®ãããªã¨ã©ã¼ãåºã¦ãã¾ãã¾ãããã¡ããã§ãã
Exception in thread "main" akka.stream.AbruptStageTerminationException: GraphStage [akka.stream.impl.HeadOptionStage$$anon$3@783971c3] terminated abruptly, caused by for example materializer or actor system termination.
KillSwitch
ã¨ããããã§é©å½ã«ã°ã°ã㨠https://stackoverflow.com/a/38326082 ã¨ããStackOverflowã®åçãããã«åºã¦ãã¾ããã©ãããKillSwitchã¨ããã®ã使ãã°è¯ãããã§ããï¼Konrad ããã®åçãªã®ã§å®å¿ï¼ã¡ãªã¿ã«ã¡ããã¨Akkaã®ããã¥ã¡ã³ãã«ãè¼ã£ã¦ãã¾ãã
ãããªæã
implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1)) val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _) val (killSwitch, result) = src .viaMat(KillSwitches.single)(Keep.right) .toMat(sink)(Keep.both) .run() Thread.sleep(1000) killSwitch.shutdown() system.terminate() logger.info("result: " + Await.result(result, Duration.Inf))
KillSwitchã§ã¾ãã¹ããªã¼ã ãçµäºãããããããActorSystemãçµäºãããã°ã¨ã©ã¼ã¯èµ·ãã¾ããã
JVMã®shutdown hookã使ã
ãã¦ããµã¼ãã¼ã®çµäºæã«Akka Streamãåæ¢ããããã®ã§ Runtime.getRuntime.addShutdownHook
ã使ããã¨ã«ãã¾ããã
implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1)) val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _) val (killSwitch, result) = src .viaMat(KillSwitches.single)(Keep.right) .toMat(sink)(Keep.both) .run() Runtime.getRuntime.addShutdownHook(new Thread(() => { killSwitch.shutdown() logger.info("result: " + Await.result(result, Duration.Inf)) system.terminate() }))
ä¸è¦è¯ãããã§ãããããã¯ã¨ã©ã¼ã«ãªããã¨ãããã¾ãã
Exception in thread "Thread-1" akka.stream.AbruptStageTerminationException: GraphStage [akka.stream.impl.HeadOptionStage$$anon$3@773caeb2] terminated abruptly, caused by for example materializer or actor system termination.
ãªãï¼
CoordinatedShutdown
ä¸æè°ã«æã£ã¦ActorSystemã®ã³ã¼ããèªãã§ããããActorSystemã®ã¤ã³ã¹ã¿ã³ã¹ãä½ãããæã«ã¯ãJVMã®shutdown hookã«ActorSystemã®çµäºå¦çã追å ããã¦ãããã¨ãç¥ãã¾ããããããåå ã§ActorSystemã«ä¾åãããããªå¦çãä¾ãã°Akka StreamãJVMã®shutdown hookã§è¡ããã¨ãã¦ããã¿ã¤ãã³ã°ã«ãã£ã¦ã¯å ã«ActorSystemãçµäºããã¾ããããªãããã§ãã
ActorSystemã®shutdown hookã¯CoordinatedShutdownã¨ããã¯ã©ã¹ã§ç»é²ããã¦ãã¾ããããã¦ActorSystemã«ä¾åããå¦çã®ShutdownãCoordinatedShutdownã使ããã¨ã§å®å ¨ã«è¡ãã¾ãã次ã®ã³ã¼ãããã®ä¾ã§ãActorSystemãçµäºããåï¼CoordinatedShtudown.PhaseBeforeActorSystemTerminateã¨ããå¤ã§æå®ãã¦ãã)ã«Akka Streamã®çµäºå¦çãè¡ã£ã¦ãã¾ãã
implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() val src: Source[Int, NotUsed] = Source.fromIterator[Int](() => Iterator.continually(1)) val sink: Sink[Int, Future[Int]] = Sink.fold(0)(_ + _) val (killSwitch, result) = src .viaMat(KillSwitches.single)(Keep.right) .toMat(sink)(Keep.both) .run() CoordinatedShutdown(system).addTask( CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "app-shutdown") { () => killSwitch.shutdown() logger.info("result: " + Await.result(result, Duration.Inf)) // ActorSystemã¯CoordinatedShutdownã使ã£ã¦ããã°åæã«çµäºãã // system.terminate() Future.successful(akka.Done) }
ã¡ãªã¿ã«CoordinatedShutdownã«ããçµäºå¦çã¯JVMã®shutdown hookã«ä»»ããã«èªåã§å¼ã¶ãã¨ãå¯è½ã§ãã
CoordinatedShutdown(actorSystem) .run(CoordinatedShutdown.JvmExitReason)
CoordinatedShutdownã¯ä¸»ã«akka-clusterã®ããã®æ©è½ã®ããã§ãä¸è¦ã§ããã°ä½¿ããªãããã«è¨å®ãããã¨ãã§ãã¾ãã