ååã¯ãã¡ãã§ã
第9ç« ã¡ãã»ã¼ã¸ã®ã«ã¼ãã£ã³ã°
EIPã®ãã¡ãã«ã¼ã¿ã¼ãã¿ã¼ã³ã«ã¤ãã¦ãã¡ãã»ã¼ã¸ã®ã«ã¼ãã£ã³ã°ãè¡ãçç±ãããã©ã¼ãã³ã¹ãã¹ã±ã¼ãªã³ã°ã§ããå ´åã¯ãAkkaã«çµã¿è¾¼ã¾ãã¦ããæé©åãããã«ã¼ã¿ã¼ã使ç¨ãããããããªãããã¡ãã»ã¼ã¸ã®å 容ã主è¦ãªé¢å¿ãã¨ã§ããå ´åã¯ãé常ã®ã¢ã¯ã¿ã¼ã使ã£ã¦å½¹å²ãã¨ã«ã¡ãã»ã¼ã¸ãæ¯ãåããã®ãè¯ãã¨ããã¦ããã
- ã«ã¼ã¿ã¼ï¼ã¡ãã»ã¼ã¸ãæ¯ãåãããã®
- ã«ã¼ãã£ã¼ï¼ã«ã¼ã¿ã¼ãé¸æã§ããã¿ã¹ã¯ã®ãã¨ãã¤ã¾ãã«ã¼ã¿ã¼ããã¡ãã»ã¼ã¸ãæ¯ãåããããå ã
Akkaã®ã«ã¼ã¿ã¼ã使ã£ãè² è·åæ£
ã·ã¹ãã ã®ããã©ã¼ãã³ã¹ãåä¸ãããããã«ãç°ãªãã¢ã¯ã¿ã¼ã«è² è·ãåæ£ãããã Akkaã«ã¯çµã¿è¾¼ã¿ã®ã«ã¼ã¿ã¼ã2種é¡åå¨ãã¦ããã
- ãã¼ã«ï¼ã«ã¼ã¿ã¼ãã«ã¼ãã£ã¼ã管çãããã¤ã¾ããã«ã¼ã¿ã¼ã«ãã£ã¦ã«ã¼ãã£ã¼ãCreateãããã
- ã°ã«ã¼ãï¼ã°ã«ã¼ãã«ã¼ã¿ã¼ã¯ã«ã¼ãã£ã¼ã管çããªããã«ã¼ãã£ã¼ã¯ä»ããCreateãããå¿
è¦ããããã«ã¼ã¿ã¼ã¯ã«ã¼ãã£ã¼ãè¦ã¤ããããã«
ActorSelection
ãç¨ãããã«ã¼ãã£ã¼ã®ã©ã¤ããµã¤ã¯ã«ãç¹å¥ãªæ¹æ³ã§å¶å¾¡ããå¿ è¦ãããå ´åãªã©ã«ç¨ããã
ãã¼ã«ã«ã¼ã¿ã¼
以ä¸ã®ããã«ãã¦ãã¼ã«ã«ã¼ã¿ã¼ã¯è¨å®ãã¡ã¤ã«ãç¨ãã¦çæã§ããã
val router = system.actorOf( FromConfig.props(Props(new Rootie(actorRef))), "poolRouter" )
akka.actor.deployment { /poolRouter { router = balancing-pool nr-of-instances = 5 } }
ãªã¢ã¼ãã«ã«ã¼ãã£ã¼ãä½æãããå ´åã¯ãæ¹æ³ã¯è¤æ°ããããç°¡åãªæ¹æ³ã¨ãã¦FromConfig
ãRemoteRouterConfig
ã«å¤ããã ãã§è¯ãã
ã¾ããåçã«ã«ã¼ãã£ã¼ã®ãµã¤ãºãå¤æ´ãããå ´åããè¨å®ãã¡ã¤ã«ã«ãªãµã¤ã¶ã¼æ©è½ã®ãªãã·ã§ã³ãã«ã¹ã¿ãã¤ãºãããã¨ã§ç´°ãããªãµã¤ãºã®æ¡ä»¶ãæå®ã§ããã
ã«ã¼ã¿ã¼ã¯ã«ã¼ãã£ã¼ãçæãã¦ãããããã«ã¼ãã£ã«å¯¾ããã¹ã¼ããã¤ã¶ã¼ã§ããããããã©ã«ãã®ã«ã¼ã¿ã¼ã使ç¨ããå ´åãã«ã¼ãã£ã¼ã¯å¸¸ã«ã¹ã¼ããã¤ã¶ã¼ã«é害ãã¨ã¹ã«ã¬ã¼ã·ã§ã³ãããã«ã¼ã¿ã¼ãããã«é害ãã¨ã¹ã«ã¬ã¼ã·ã§ã³ããã¨ãé害ã®èµ·ãã£ãã«ã¼ãã£ã¼ã®ã¿ã§ã¯ãªããã«ã¼ã¿ã¼èªä½ãåèµ·åããã¦ãã¾ããå ¨ã¦ã®ã«ã¼ãã£ã¼ãåèµ·åããã¦ãã¾ããããã§ãã«ã¼ã¿ã¼ã®çæ寺ã«ç¬èªã®æ¦ç¥ãä¸ãããã¨ã§é害ã®çºçããã«ã¼ãã£ã¼ã ããåèµ·åãããããã«å¤æ´ã§ããã
ã°ã«ã¼ãã«ã¼ã¿ã¼
ã«ã¼ãã£ã¼ãèªåã§ã¤ã³ã¹ã¿ã³ã¹åããæ示çã«ã«ã¼ã¿ã¼ã®ç®¡çä¸ã«ç½®ããããã«ãããã«ã¼ãã£ã¼ãçæããã¿ã¤ãã³ã°ã®å¶å¾¡ãå¯è½ã«ãªãã
ã°ã«ã¼ãã®å ´åã¯ãã«ã¼ãã£ã¼ãã¹ã®ãªã¹ããæå®ãããã¨ã§ã«ã¼ãã£ã¼ãçæãããå
·ä½çã«ã¯ãCreatorã¢ã¯ã¿ã¼ãRootieã¢ã¯ã¿ã¼ãRootie1, Rootie2ã®äºã¤çæãããã®äºã¤ã®ã¢ã¯ã¿ã¼ãã«ã¼ãã£ã¼ã«æå®ããå ´åã¯ä»¥ä¸ã®ããã«è¨å®ããã
akka.actor.deployment { /groupRouter { router = round-robin-group routees.paths = [ "/user/Creator/Rootie1", "/user/Creator/Rootie2" ] } }
ã«ã¼ãã£ã¼ãçµäºãã¦ãã°ã«ã¼ãã«ã¼ã¿ã¼ã¯å¼ãç¶ãã«ã¼ãã£ã¼ã«ã¡ãã»ã¼ã¸ãéä¿¡ãããããã«ã¼ãã£ã¼ã管çãã¦ããã¢ã¯ã¿ã¼ããåã¢ã¯ã¿ã¼ã®çµäºã«å¯¾ãã¦ãã³ããªã³ã°ããå¿ è¦ãããã
ãã®ä»
ããåºã¦ããbecome
ã¡ã½ããã«ã¤ãã¦å®ç¾©ã確èªãããå®ç¾©ãè¦ãã°ãããããã«ãReceive
ã渡ããã¨ã§ãã¡ãã»ã¼ã¸ãåãåã£ãæã®æåãå¤ãããã¨ãã§ããããªã®ã§ãç¶æ
ãæã£ãã¢ã¯ã¿ã¼ã®ãªãã«ããç¾ãããã¨ãããã
/** * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. * Replaces the current behavior on the top of the behavior stack. */ def become(behavior: Actor.Receive): Unit = become(behavior, discardOld = true)
第10ç« ã¡ãã»ã¼ã¸ãã£ãã«
- point to point channelï¼ä»ã¾ã§æ±ã£ã¦ããã¢ã¯ã¿ã¼ã¯å ¨ã¦ããã
- publish/subscribe channelï¼éä¿¡è ãã¡ãã»ã¼ã¸ãå¿ è¦ã¨ãã¦ããåä¿¡è ãç¥ããã¨ãªããè¤æ°ã®åä¿¡è ã«ã¡ãã»ã¼ã¸ãéä¿¡ããã
publish/subscribe channel
ãã£ãã«ã¯ãå ¨ã¦ã®subscriberãã¡ãã»ã¼ã¸ãåãåããããã«ãããåä¿¡è ãèªèº«ã§subscribeãããããåçã«åä¿¡è ã®æ°ãå¤ãããã¨ãã§ãæè»ã«å¤æ´ã§ããã
æºããããã¹ãè¦ä»¶ã¯ä»¥ä¸ã®éãã
- éä¿¡å´ã¯ãã¡ãã»ã¼ã¸ãpublishã§ããå¿ è¦ãããã
- åä¿¡å´ã¯ããã£ãã«ã®subscribeã¨unsbscribeãã§ããå¿ è¦ãããã
Akkaã®ã¤ãã³ãã¹ããªã¼ã
Akkaã§ã¯ EventStream
ã使ç¨ãããã¨ã§publish/subscribeã§ããããã«ãªããå
¨ã¦ã®ã¢ã¯ã¿ã¼ã·ã¹ãã ã«ã¯ä¸ã¤ã® EventStream
ããããã©ã®ã¢ã¯ã¿ã¼ããã§ãå©ç¨ã§ãããã¢ã¯ã¿ã¼ã¯ç¹å®ã®ã¡ãã»ã¼ã¸åãsubscribeã§ãã誰ãããããpublishããã¨åãåããã¨ãã§ããã
subscribeã¯ãéåä¿¡ããã¢ã¯ã¿ã¼ãèªåã§è¨å®ããå¿ è¦ã¯ãªããå¿ è¦ãªãã®ã¯ã次ã®2ç¹ã®ã¿ã§ãããããããã°ä»»æã®å ´æã§subscribeããããã¨ãã§ããã
- subscribeããã¢ã¯ã¿ã¼ã®åç §
- subscribeã®è¨å®ãè¡ãEventStreamã¸ã®åç §
system.eventStream.subscribe( actorRef, classOf[Message] )
publishã¯ä»¥ä¸ã®ããã«ãã¦è¡ããã
system.eventStream.publish(msg)
ãã¼ã«ã«ã·ã¹ãã å
¨ä½ããã¡ãã»ã¼ã¸ãéä¿¡ãããããåéããããã®è§£æ±ºçã¨ãã¦ãEventStream
ã¯æ´»ç¨ã§ãããä¾ãã°ActorLogging
ã¯å
é¨çã«EventStream
ãç¨ãã¦ã·ã¹ãã å
¨ä½ãããã°ãåéãã¦ããã以ä¸ã¯ ActorLogging
ã®apply
é¢æ°ã
def apply[T: LogSource](system: ActorSystem, logSource: T): LoggingAdapter = { val (str, clazz) = LogSource(logSource, system) new BusLogging(system.eventStream, str, clazz, system.asInstanceOf[ExtendedActorSystem].logFilter) }
ã«ã¹ã¿ã ã¤ãã³ããã¹
ãæ¡ä»¶xãæºããæã®ã¿ã¡ãã»ã¼ã¸ãéããã¨ããå ´åãèãã¦ã¿ããEventStream
ã§ã¯ã¡ãã»ã¼ã¸ã®åãå
ã«éä¿¡ãããå¦ãã決å®ããããããã£ã«ã¿ãªã³ã°ãã§ããªããåä¿¡å
ã§ãã£ã«ã¿ãªã³ã°ã¯ã§ãããããã以å¤ã®æ¹æ³ãèãã¦ã¿ããç¬èªã®publish/subscribe channel ãä½æãããã¨ã§ãããå®ç¾ãããã¨ãã§ããã
EventBus
ã¨ããã¤ã³ã¿ã¼ãã§ã¼ã¹ãç¨ãã¦ä½æãããã¨ãã§ãããEventBus
ã£ã¦ããããä½ãã¨ããã¨ããäºããç¥ãå¿
è¦ããªãåæ¹åã«éç¥ãè¡ãããããªä»çµã¿ã®ãã¨ã®ããã ã
An Eventbus is a mechanism that allows different components to communicate with each other without knowing about each other. A component can send an Event to the Eventbus without knowing who will pick it up or how many others will pick it up. Components can also listen to Events on an Eventbus, without knowing who sent the Events. That way, components can communicate without depending on each other. Also, it is very easy to substitute a component. As long as the new component understands the Events that are being sent and received, the other components will never know.
EventBus
ã«ã¯3ã¤ã®ã¨ã³ãã£ãã£ãæå®ããã
- Event: busã«publishãããã¤ãã³ãå ¨ã¦ã表ãåã
- Subscriber: ã¤ãã³ããã¹ã«ç»é²ããsubscriberã®åãAkkaã®å ´åã¯ActorRefã§ããã
ActorEventBus
ãããã¯ã¹ã¤ã³ãããã¨ã§ç»é²ãããã¨ãå¤ãã - Classifier: ã¤ãã³ããéä¿¡ããã¨ãã®Subscriberã®é¸æã«ä½¿ç¨ããåé¡åãå®ç¾©ã
Classification
ãã¬ã¤ããããã¯ã¹ã¤ã³ãããã¨ã§ãclassify
ã¡ã½ããã«ãããEvent
ããClassifier
ãæ½åºããã
/** * Returns the Classifier associated with the given Event */ protected def classify(event: Event): Classifier
ã¤ã¾ãã
eventBus.subscribe(actorRef, classifier) // classifierã®ç»é² eventBus.publish(msg) // ããã§ãmsgãä¸ã§subscribeããclassifierã§ãããªãã°ãsubscriberãé¸æãããå®éã«éä¿¡ãããã
ç¹æ®ãã£ãã«
ãããã¬ã¿ã¼ãã£ãã«
å¦çã¾ãã¯é
ä¿¡ã§ããªãå
¨ã¦ã®ã¡ãã»ã¼ã¸ãå«ããã£ãã«ãï¼ãããã¡ãã»ã¼ã¸ãã¥ã¼ã¨ãå¼ã°ãããï¼ãã
ãã®ãã£ãã«ãç£è¦ããã¨ãå¦çããã¦ããªãã¡ãã»ã¼ã¸ããããã
Akkaå
é¨ã§ã¯ãDeadLetterListener
ãpreStart
ã§DeadLetterãåãåãEventStreamãsubscribeããå½¢ã§å®è£
ããã¦ããã
override def preStart(): Unit = eventStream.subscribe(self, classOf[DeadLetter])
ä¿è¨¼é ä¿¡ãã£ãã«
ãªã¢ã¼ãã¢ã¯ã¿ã¼ã使ç¨ããå ´åãéã®ãããã¯ã¼ã¯ãæ»ãã ããã¦ããã¨ã¡ãã»ã¼ã¸ãæ¶å¤±ãã¦ãã¾ããReliableProxy
ã使ãã¨ããã®åé¡ã解決ãããªã¢ã¼ãã¢ã¯ã¿ã¼ã«å¯¾ãã¦éä¿¡ã§ããå¯è½æ§ãé«ã¾ãã
ä»ã³ã¼ããè¦ãã @deprecated("Use AtLeastOnceDelivery instead", "2.5.0")
ã¨æ¸ãã¦ãã£ãã®ã§ãéæ¨å¥¨ã«ãªã£ã¦ãã£ã½ãã
第11ç« æéç¶æ ãã·ã³ã¨ã¨ã¼ã¸ã§ã³ã
ç¶æ ãæ±ãããã®æ¹æ³ã¨ãã¦ãæ°ããäºã¤ã®æ¹æ³ãç´¹ä»ããã
æéç¶æ ãã·ã³ï¼Finite Stage Machine: FSMï¼
Akkaããã¥ã¡ã³ãããå¼ç¨
FSMã¯ã次ã®å¼ã®é¢ä¿ã®éåã¨ãã¦è¨è¿°ã§ãã¾ãã
State(S) x Event(E) -> Actions (A), State(S')
ãããã®é¢ä¿ã¯ã次ã®ããã«è§£éããã¾ãã
ç¶æ Sã§ã¤ãã³ãEãçºçããå ´åã¯ãã¢ã¯ã·ã§ã³Aãå®è¡ãã¦ç¶æ S'ã«é·ç§»ããå¿ è¦ãããã¾ã
ã¨ã¼ã¸ã§ã³ãã使ã£ãç¶æ å ±æã®å®è£
ç¾å¨ã¨ã¼ã¸ã§ã³ãã¯éæ¨å¥¨ã«ãªã£ã¦ãããAkka Typed ã使ããã¨ãæ¨å¥¨ããã¦ããã
2.6.0-RC1でstableになったAkka Typedを試してみる - Candy, Vitamin or Painkiller
第12ç« ã¹ããªã¼ãã³ã°
ãã¼ã¿ã®ã¹ããªã¼ã ã¨ã¯ãçµããããªãè¦ç´ ã®é åã®ãã¨ã§ã以ä¸ã®æã«åå¨ãã¦ããã
- ãããã¥ã¼ãµã¼ãã¹ããªã¼ã ã«è¦ç´ ãæä¾
- ã³ã³ã·ã¥ã¼ãã¼ãã¹ããªã¼ã ããè¦ç´ ãèªã¿è¾¼ã
akka-streamã¯æéã®ãããã¡ã§ç¡æ¡ä»¶ã®ã¹ããªã¼ã ãå¦çããæ¹æ³ãæä¾ãããã¾ããakka-httpã§ã¯å é¨ã§akka-streamã使ç¨ãã¦ããã
åºæ¬çãªã¹ããªã¼ã å¦ç
以ä¸ã®3ã¤ã®åå¨ãèããã
- è¦ç´ ã®ãããã¥ã¼ãµã¼
- å¦çãã¼ã
- è¦ç´ ã®ã³ã³ã·ã¥ã¼ãã¼
sourceã¨sinkã使ã
akka-streamã使ãã«ã¯ã以ä¸ã®ã¹ããããå¿ è¦ã«ãªãã
- å¦çããã¼ã®å®ç¾©ï¼ã¹ããªã¼ã å¦çã³ã³ãã¼ãã³ãã®ã°ã©ããå®ç¾©ã
- å¦çããã¼ã®å®è¡ï¼ã¢ã¯ã¿ã¼ã·ã¹ãã ã§ã®å®è¡ãã°ã©ãããã¢ã¯ã¿ã¼ã«å¤æãããã
ãã¡ã¤ã«ãã³ãã¼ããç°¡åãªä¾ãèãããè¦ç´ ã®ä¾çµ¦æºã§ãã Source ã¨è¦ç´ ã®å¸åæºã§ãã Sink ãã¤ãªãããã¨ã«ãã£ã¦ã
sourceããsinkã«ãã¼ã¿ãç´æ¥éãGraphãå®ç¾©ããã
val source: Source[ByteString, Future[IOResult]] = FileIO.fromPath(inputFile) val sink: Sink[ByteString, Future[IOResult]] = FileIO.toPath(outputFile, Set(CREATE, WRITE, APPEND)) val runnableGraph: RunnableGraph[Future[IOResult]] = source to sink
ãããªã¢ã©ã¤ãºã¨ã¯
runnableGraph.run()
ã¯implicit㪠Materializer
ãå¿
è¦ã¨ãããããã¯ãRunnableGraph
ãã¢ã¯ã¿ã¼ã«å¤æãã¦ã°ã©ããå®è¡ãããå
·ä½çã«ã¯ã
- ã°ã©ãã«åå¨ããå ¥åã¨åºåãå ¨ã¦æ¥ç¶ããã¦ããã確èª
- Sourceã¨Sinkã«ç´ã¥ãã¢ã¯ã¿ã¼ãããããçæã
- ãã®ã¢ã¯ã¿ã¼ã®éã«pub/subé¢ä¿ãçµã¶ã
ããã«ãPublisherã¨Subscriberã®éã§ã¯ããªã¯ã¨ã¹ãããã以ä¸ã®ã¡ãã»ã¼ã¸ãéä¿¡ããªãä»çµã¿ã«ãªã£ã¦ãããã¡ã¢ãªããªã¼ãã¼ããã¼ããªãããã«éåä¿¡ããã¦ããã
Sourceã¨Sinkã§ã¯ãã°ã©ããã¾ã¦ãªã¢ã©ã¤ãºãããæã«è£å©å¤ãæä¾ããããã¡ã¤ã«ã®å ´åã¯Future[IOResult]
ãæä¾ãããã©ã®çµæãKeepããããæå®ã§ããããã«ãã£ã¦ã©ã®ãããªã¢ã©ã¤ãºãããå¤ãä¿æããããæå®ã§ããã
ããã¼ã«ããã¤ãã³ãã®å¦ç
Flowã¯Sourceã¨Sinkã®éã«ä½¿ããã³ã³ãã¼ãã³ãã§ãå ¨ã¦ã®ã¹ããªã¼ã ãã¸ãã¯ããã£ããã£ãããããããã®å¦çãçµã¿åããã¦Flowã¨ãã¦æ§æãããã
Flowã®å®ç¾©ãè¦ã¦ãããå ¥åã¨åºåãä¸ã¤ãã¤å®ç¾©ããã¦ãããã¨ããããã第3ã®åæå®ã¯Materializeãªã®ã§ãFlowã®è£å©å¤ï¼å¯ä½ç¨ã®ãããªãã®ã¨æãã¦ããï¼ãå¾ããããã®ã¨èããããã
/** * A `Flow` is a set of stream processing steps that has one open input and one open output. */ final class Flow[-In, +Out, +Mat]
ä¾ãã°ããã¡ã¤ã«ããByteStringãåãåã£ã¦ãããããã¼ã¹ããå¦çãèãããããã¨ãFlowã®åã³ã³ãã¼ãã³ãã¯ä»¥ä¸ã®ããã«ãªãã
// ãã¡ã¤ã«ããåãåã£ãByteStringãStringã«å¤æããFlow val frame: Flow[ByteString, String, NotUsed] = Framing.delimiter(ByteString("\n"), maxLine).map(_.decodeString("UTF8")) // ByteStringãcase classã§ããEventã«ãããã³ã°ããFlow val parse: Flow[String, Event, NotUsed] = Flow[String].map(LogStreamProcessor.parseLineEx) .collect { case Some(e) => e } .withAttributes(ActorAttributes.supervisionStrategy(decider)) // åæãããããã¼ val composedFlow: Flow[ByteString, Event, NotUsed] = flow via parse // runnableã°ã©ãã®æ§ç¯ val runnableGraph: RunnableGraph[Future[IOResult]] = source.via(composedFlow).toMat(sink)(Keep.right)
ã¹ããªã¼ã ã®ã¨ã©ã¼å¦ç
ä¸è¨ã®ãããªFlowã®å ´åã«ã¨ã©ã¼ãçºçããæãFutureã¯å¤±æããå¤ãè¿ããããã§ããã¼ã¹ã失æããæã«ãã®ä¾å¤ãç¡è¦ãã¦ä»ã®è¡ã®ãã¼ã¹ã¯è¡ãããäºãèãããã¹ããªã¼ã ã«ãã¹ã¼ããã¤ã¶ã¼æ¦ç¥ãå®ç¾©ã§ããã
val decider: Supervision.Decider = { case _: LogParseException => Supervision.Resume case _ => Supervision.Stop } val parse: Flow[String, Event, NotUsed] = Flow[String].map(LogStreamProcessor.parseLineEx) .collect { case Some(e) => e } .withAttributes(ActorAttributes.supervisionStrategy(decider))
BidiFlowã¨ã¯
2ã¤ã®å ¥åã¨2ã¤ã®åºåãæã¤ã°ã©ãã³ã³ãã¼ãã³ããå¤åBidirectionã®ç¥ãã½ã¼ã¹ã³ã¼ãã«æ·»ä»ãã¦ããå³ãè¦ãã¨ããããããã
/** * Wraps two Flows to create a ''BidiFlow''. The materialized value of the resulting BidiFlow is determined * by the combiner function passed in the second argument list. * * {{{ * +----------------------------+ * | Resulting BidiFlow | * | | * | +----------------------+ | * I1 ~~> | Flow1 | ~~> O1 * | +----------------------+ | * | | * | +----------------------+ | * O2 <~~ | Flow2 | <~~ I2 * | +----------------------+ | * +----------------------------+ * }}} * */
ã¤ã¾ããI1ãå ¨ä½ã®å ¥åãO2ãå ¨ä½ã®åºåã¨ãããããã¦ãä¸è¨ã®O1ã¨I2ãã¤ã³ã¿ã¼ãã§ã¼ã¹ã¨ãã¦æã£ã¦ããFlowãã¤ãªããäºã§ãä»»æã®ããã¼ãæ¥ç¶ãã§ããããã«ããã¨ãããã®ãå ·ä½çã«ã¯ä»¥ä¸ã®ããã«ä½¿ãã
val bidiFlow = BidiFlow.fromFlows(inFlow, outFlow) val flow = bidiFlow.join(filterFlow)
ã¹ããªã¼ãã³ã°HTTP
å®éã«akka-httpã使ã£ã¦POSTã¨GETãã©ã®ããã«å¦çããããå¦ãã ã詳細ã¯æ¬ã®ã½ã¼ã¹ã³ã¼ããªã©ã«è²ãã
ãã¼ã·ã£ãªã³ã°ã¨ã¢ã³ãã¼ã·ã£ãªã³ã°
HTTPã¬ã¹ãã³ã¹ãè¿ãéã«ãã¢ããªã±ã¼ã·ã§ã³ã§ä½¿ããã¦ãããªãã¸ã§ã¯ãããHTTPã¬ã¹ãã³ã¹ç¨ã®ãã¼ã¿ãã©ã¼ãããã«å¤æããäºããã¼ã·ã£ãªã³ã°ã¨ããããã®éãã¢ã³ãã¼ã·ã£ãªã³ã°ã¨ããã
ã°ã©ãDSL
å ¥åã¨åºåãä»»æã®æ°ã ãæãããããã°ã©ãDSLãä¾ãã°ãEventãåãåã£ã¦ä»¥ä¸ã®5ã¤ã®ã¤ãã³ããããããå¥ã ã®Flowã§å¦çããå ´åã¯ãã®ãããªã³ã¼ãã«ãªãã
- Jsonã«å¤æããByteStringãè¿ã
- okã®ãã°åºå
- warningã®ãã°åºå
- errorã®ãã°åºå
- criticalã®ãã°åºå
type FlowLike = Graph[FlowShape[Event, ByteString], NotUsed] def processStates(logId: String): FlowLike = { val jsFlow = LogJson.jsonOutFlow Flow.fromGraph( GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ // all logs, ok, warning, error, critical, so 5 outputs val bcast = builder.add(Broadcast[Event](5)) val js = builder.add(jsFlow) val ok = Flow[Event].filter(_.state == Ok) val warning = Flow[Event].filter(_.state == Warning) val error = Flow[Event].filter(_.state == Error) val critical = Flow[Event].filter(_.state == Critical) bcast ~> js.in bcast ~> ok ~> jsFlow ~> logFileSink(logId, Ok) bcast ~> warning ~> jsFlow ~> logFileSink(logId, Warning) bcast ~> error ~> jsFlow ~> logFileSink(logId, Error) bcast ~> critical ~> jsFlow ~> logFileSink(logId, Critical) FlowShape(bcast.in, js.out) }) }
ããã¾ã§ã§èããã¨ãè¿ãå¤ã Flow[Event, ByteString, NotUsed]
ã§ãªãã®ãéåæãæãããããããªãããFlowã¯Graphãç¶æ¿ãã¦ãããã¤ã¾ãå
¥åã¨åºåãä¸ã¤ãã¤æã¤ç¹å¥ãªå ´åã®ShapeãFlowãªã®ã§ããã
ã³ã³ã·ã¥ã¼ãã¼ã¨ãããã¥ã¼ãµã¼ã®ä»²ä»
éè¦ã¨ä¾çµ¦ã®ãã©ã³ã¹ãã©ããã£ã¦åããã«ã¤ãã¦ããããã¡ã¼ãç¨ãã¦ããã解決ããã
ããã©ã«ãã§ã¯ããã¯ãã¬ãã·ã£ã¼ãæå¹ã«ãªã£ã¦ãããã¹ããªã¼ã ãå¦çã§ããã
Akka Streamsで実装するリアクティブストリーム | Think IT(シンクイット)ããå¼ç¨
ããã¯ãã¬ãã·ã£ã¼ãããã³ã«ã«ã¯ãä¸æµã®ãµãã¹ã¯ã©ã¤ãã¼ãåä¿¡ãã¦ãããã¡ãªã³ã°ã§ããè¦ç´ ã®æ°ãå®ç¾©ããã¦ãã¾ããããã¯ãã¬ãã·ã£ã¼ã¯ããµãã¹ã¯ã©ã¤ãã¼ãå¦çã§ãã以ä¸ã®è¦ç´ ãããããªãã·ã£ã¼ããããªãã·ã¥ããªããã¨ãä¿è¨¼ãã¾ãã
ããã«ã¯ãbufferãexpandã®ãããªã¡ã½ããã使ã£ã¦å¤æ´ãå ãããã¨ãã§ããã
第13ç« ã·ã¹ãã çµ±å
- Alpakkaã使ç¨ãã¦ãå¤é¨ã·ã¹ãã ã¨é£æºããã
- akka-httpã使ç¨ãã¦HTTPãããã³ã«ããµãã¼ãããã
ã¡ãã»ã¼ã¸ã¨ã³ããã¤ã³ã
æ£è¦åãã¿ã¼ã³
- æ§ã ãªç¨®é¡ã®ã¡ãã»ã¼ã¸ãå ±éã®æ¨æºåãããã¡ãã»ã¼ã¸ã«å¤æããã
- ã·ã¹ãã ã¯ãã¡ãã»ã¼ã¸ãæ§ã ãªå¤é¨ã·ã¹ãã ããéããã¦ããäºãæ°ã«ããã¡ãã»ã¼ã¸ãå¦çã§ããã
- ã«ã¼ã¿ã¼ãä»ãã¦ããã©ã³ã¹ã¬ã¼ã¿ã¼ã«éããå ±éã®ã¡ãã»ã¼ã¸ã«æ£è¦åãã¦éããã¿ã¼ã³ãããã
æ¨æºãã¼ã¿ã¢ãã«ãã¿ã¼ã³
ã·ã¹ãã éã®æ¥ç¶è¦ä»¶ãå¢å ããã¨ãã¨ã³ããã¤ã³ããå¤ããªããæ£è¦åãã¿ã¼ã³ã§ã¯è¤éã«ãªã£ã¦ãã¾ãããã æ¨æºãã¼ã¿ã¢ãã«ãã¿ã¼ã³ã§ã¯ãå ¨ã¦ã®ã·ã¹ãã ã§å ±éã®ã¤ã³ã¿ã¼ãã§ã¼ã¹ãå®è£ ãã¦ãå ±éã¡ãã»ã¼ã¸ãå©ç¨ããã¨ã³ããã¤ã³ããæãããã
ã¨ã³ããã¤ã³ããå ±éå½¢å¼ã®ã¡ãã»ã¼ã¸ãåä¿¡ãããã®ãã¨ç¬èªã®ã¡ãã»ã¼ã¸ã«å¤æãããéã«ã¨ã³ããã¤ã³ããç¬èªã®ã¡ãã»ã¼ã¸ãå ±éå½¢å¼ã®ã¡ãã»ã¼ã¸ã«å¤æãã¦ãå ±éã¤ã³ã¿ã¼ãã§ã¼ã¹ãä»ãã¦éä¿¡ããã
æ¨æºãã¼ã¿ã¢ãã«ãã¿ã¼ã³ããã¢ããªã±ã¼ã·ã§ã³ã®åã ã®ãã¼ã¿å½¢å¼ã¨å¤é¨ã·ã¹ãã ã§å©ç¨ããããã¼ã¿å½¢å¼ã«éæ¥åç §ãæä¾ããä¸æ¹ã§ãæ£è¦åãã¿ã¼ã³ã¯1ã¤ã®ã¢ããªã±ã¼ã·ã§ã³ã«éãã¦ããã¨ããç¹ã§ãããã®éæ¥åç §ãããããå©ç¹ã¯ãæ°ããã¢ããªã±ã¼ã·ã§ã³ãã·ã¹ãã ã«è¿½å ããæã«å ±éã¡ãã»ã¼ã¸ãå¦çãããã©ã³ã¹ã¬ã¼ã¿ã¼ã ãç¨æããã°ããã¨ãããã¨ã§ããæ¢åã·ã¹ãã ã®å¤æ´ã¯å¿ è¦ããã¾ããã
Alpakkaãç¨ããã¨ã³ããã¤ã³ãã®å®è£
Alpakkaã§æä¾ããã¦ããã³ã³ãã¼ãã³ãã使ãã¨ãç°¡åã«å¤é¨ã·ã¹ãã ã¨æ¥ç¶ã§ããã
ä¾ã¨ãã¦ããã£ã¬ã¯ããªå
ã®ãã¡ã¤ã«å¤æ´æ¤ç¥ããAMQPãç¨ããã¡ãã»ã¼ã¸éä¿¡ãããããããä»åã¯å
±éå½¢å¼ã®ã¡ãã»ã¼ã¸ã Order
ã¯ã©ã¹ã«æå®ãã¦ãããããããã¤ãã³ããSourceã«å¤æãã¦ããã®ã§ãtoMat
㧠RunnableGraph
ã«å¤æããäºã§ã¹ããªã¼ã å¦çãå¯è½ã«ãªãã
å¤é¨ã·ã¹ãã ããã¡ãã»ã¼ã¸ã®åä¿¡
Alpakkaãç¨ãã¦ãã¡ã¤ã«ã®å¤æ´ãæ¤ç¥ããSourceãçæããã
object FileXmlOrderSource { def watch(dirPath: Path): Source[Order, NotUsed] = DirectoryChangesSource(dirPath, pollInterval = 500.millis, maxBufferSize = 1000) .collect { case (path, DirectoryChange.Creation) => path } .map(_.toFile) .filter(file => file.isFile && file.canRead) .map(scala.io.Source.fromFile(_).mkString) .via(parseOrderXmlFlow) } // streamå¦çã§ãã val consumer: RunnableGraph[Future[Order]] = FileXmlOrderSource.watch(dir.toPath) .toMat(Sink.head[Order])(Keep.right)
AMQPã®å ´å
object AmqpXmlOrderSource { def apply(amqpSourceSettings: AmqpSourceSettings): Source[Order, NotUsed] = AmqpSource.atMostOnceSource(amqpSourceSettings, bufferSize = 10) .map(_.bytes.utf8String) .via(parseOrderXmlFlow) } // streamå¦çã§ãã val consumer: RunnableGraph[Future[Order]] = AmqpXmlOrderSource(amqpSourceSettings) .toMat(Sink.head)(Keep.right)
AMQPèªä½ãªãããã¨ããæãã ã£ãã®ã§ãã®è¾ºã®è¨äºãèªãã ã
Advanced Message Queuing Protocol - Wikipedia
å¤é¨ã·ã¹ãã ã¸ã®ã¡ãã»ã¼ã¸éä¿¡
å¤é¨ã·ã¹ãã ã¸ã®ã¡ãã»ã¼ã¸éä¿¡ãåä¿¡æã¨åæ§ã«ãä»åº¦ã¯ AmqpSink
ãç¨ããã°å
±éã¡ãã»ã¼ã¸ãä»ããSink
ãçæã§ããã
HTTP
ä»åã¯akka-httpãç¨ãã¦RESTã¤ã³ã¿ã¼ãã§ã¼ã¹ãå®è£ ããã
~
ã使ãäºã§ãã«ã¼ãã®å®ç¾©ããã£ã¬ã¯ãã£ããçµã¿åããããã¨ãã§ããã以ä¸ã®ä¾ã¯ getOrder
ã postOrders
ã®ããããã¨ãããããã¨èªããã¨ããã§ããã
val routes = getOrder ~ postOrders
~
ã®å®ç¾©ã¯ä»¥ä¸ã®éãã
/** * Returns a Route that chains two Routes. If the first Route rejects the request the second route is given a * chance to act upon the request. */ def ~(other: Route): Route = ...
å
·ä½çã«ãã£ã¬ã¯ãã£ããã©ã®ããã«ãªã¯ã¨ã¹ããå¦çãããã追ã£ã¦ã¿ãã以ä¸ã¯getOrder
ã®è©³ç´°ã
// getã¯MethodDirectivesã®ä¸ã¤ãGETãªã¯ã¨ã¹ã以å¤ãrejectããã def getOrder = get { // pathPrefixã¯PathDirectivesã®ä¸ã¤ãPathMatcherãåãåãã // ã¹ã©ãã·ã¥ä»¥éã®ãã¿ã¼ã³ããããã¦ããªãé¨åã«é©ç¨ããã pathPrefix("orders" / IntNumber) { id => // IntNumberã¯NumberMatcher, PathMatcherãextendãã¦ããããªã¯ã¨ã¹ãã®pathããæ°å¤ãåãåºãã // onSuccessã¯FutureDirectivesã®ä¸ã¤ãFutureã®å¤ãåãåºãã¦ãinner scopeã®routeãå®è¡ããã onSuccess(processOrders.ask(OrderId(id))) { case result: TrackingOrder => // completeã¯RouteDirectivesã®ä¸ã¤ãå¼æ°ãããªã¯ã¨ã¹ããå®äºããã complete( <statusResponse> <id> {result.id} </id> <status> {result.status} </status> </statusResponse> ) case result: NoSuchOrder => complete(StatusCodes.NotFound) } } }
第14ç« ã¯ã©ã¹ã¿ãªã³ã°
6ç« ã§ã¯ã決ã¾ã£ãæ°ã®ãã¼ããå©ç¨ãã¦åæ£ã¢ããªã±ã¼ã·ã§ã³ãæ§ç¯ããã詳細ã¯ãã¡ã
ããã«ãã¯ã©ã¹ã¿ã¼ã使ãã¨ãåæ£ã¢ããªã±ã¼ã·ã§ã³ã§ä½¿ç¨ãããã¼ãæ°ãåçã«å¢æ¸ããããã¨ãã§ããã
ãªãã¯ã©ã¹ã¿ãªã³ã°ãç¨ããã
ã¯ã©ã¹ã¿ã¼ã¯åçãªãã¼ãã®ã°ã«ã¼ããåãã¼ãã¯ã¢ã¯ã¿ã¼ã·ã¹ãã ãæã£ã¦ãããã¯ã©ã¹ã¿ã¼ã«æå±ããã¡ã³ãã¼ãã¼ãã®ãªã¹ãã¯ç¾å¨ã®ã¯ã©ã¹ã¿ã¼ã®ç¶æ ã¨ãã¦ç¶æããããã¢ã¯ã¿ã¼ã·ã¹ãã ã¯ãäºãã«ãã®æ å ±ãä¼éãåããå ·ä½çã«ã¯æ¬¡ã®ãããªæ©è½ãæã£ã¦ããã
ã¯ã©ã¹ã¿ã¼ã¡ã³ãã¼ã·ãã
ã¯ã©ã¹ã¿ã¼ã¯ã·ã¼ããã¼ãããã¹ã¿ã¼ãã¼ããã¯ã¼ã«ã¼ãã¼ãã§æ§æãããã
- ã·ã¼ããã¼ãï¼ã¯ã©ã¹ã¿ã¼ãèµ·åããããã«å¿ è¦ãã¯ã©ã¹ã¿ã¼ã®èµ·ç¹ã§ãããä»ã®ãã¼ãã¨ã®æåã®æ¥ç¹ã¨ãã¦æ©è½ããã
- ãã¹ã¿ã¼ãã¼ãï¼ã¸ã§ãã®å¶å¾¡ã¨ç£ç£ã
- ã¯ã¼ã«ã¼ãã¼ãï¼ãã¹ã¿ã¼ã«ä»äºãè¦æ±ããå¦ççµæããã¹ã¿ã¼ã«è¿ãã
ã·ã¼ããã¼ããèµ·åããã®ã¡ã¯ãå ¨ã¦ã®ãã¼ããç¬ç«ã«ä¾åé¢ä¿ãªãèµ·åãããã¨ãã§ããã
ããã¨ã¯å¥ã«ããªã¼ãã¼ ã¨ãã責åããããããã¯ãã¡ã³ãã¼ãã¼ãã®ç¶æ
ã Up
ãªã®ã Down
ãªã®ããå¤æãããããã¦ãå®éã«ã¯ã©ã¹ã¿ã¼ã«ãã¼ããåå ãããããé¢è±ãããããããã¯ã©ã¹ã¿ã¼ã«åå¨ããã©ã®ãã¼ãããªã¼ãã¼ã«ãªãå¯è½æ§ãããã
é害ãèµ·ããæ
é害ãèµ·ããæ Unreachable
ã¨ããç¶æ
ã«ãªããã¯ã©ã¹ã¿ã¼ã¯å°éä¸è½ãªãã¼ããæ¤åºãããå°éä¸è½ãªãã¼ããããéãããªã¼ãã¼ã¯ã¢ã¯ã·ã§ã³ãå®è¡ãããã¨ãã§ããªãã®ã§ãã¾ãã¯å°éä¸è½ãã¼ãã Down
ãããå¿
è¦ãããã
ã¯ã©ã¹ã¿ã¼ãªãã®ãã¼ãã§èµ·ããé害ãéç¥ããããå ´åã¯ã subscribe
ããäºã§ã¤ãã³ãéç¥ãã¢ã¯ã¿ã¼ã§åãããã¨ãã§ããã
ã¯ã©ã¹ã¿ãªã³ã°ãããã¸ã§ãã®å¦ç
ãã¹ã¿ã¼ã¯æåã«ã¯ã¼ã«ã¼ãä½æãã¦ãããã¡ãã»ã¼ã¸ãããã¼ããã£ã¹ãããå¿
è¦ããããããã¯ã«ã¼ã¿ã¼ã§å®ç¾ããããã¯ã©ã¹ã¿ã¼ã¨ã«ã¼ã¿ã¼ãåãããããã«ã¯æ¢ã«åå¨ããæå¹ãªãã¼ã«ã ClusterRouterPool
ã¸æ¸¡ããã¨ãå¿
è¦ã«ãªãã å
·ä½çãªæµãã¯æ¬¡ã®éã
- ãã¹ã¿ã¼ã
ClusterRouterPool
,BroadcastPool
ãç¨ãã¦ã«ã¼ã¿ã¼ã¨ãªããã¯ã¼ã«ã¼ãã«ã¼ãã£ã¼ã¨ãã¦çæããã - ãã¹ã¿ã¼ï¼=ã«ã¼ã¿ã¼ï¼ã§ã¯ãã¯ã¼ã«ã¼ï¼=ã«ã¼ãã£ã¼ï¼ã«å¯¾ãã¦ãjobãå§ããã¨ããã¡ãã»ã¼ã¸ãboradcastããã
- ãã¹ã¿ã¼ã¯ãã¯ã¼ã«ã¼ããã¿ã¹ã¯éå§ãããã¨ãããªã¯ã¨ã¹ããåä¿¡ããããã¿ã¹ã¯ã«å¿ è¦ãªã¡ãã»ã¼ã¸ãéä¿¡ãããï¼ããã¯ã«ã¼ã¿ã¼ã®æ©è½ã¯ç¨ãããåã ã«å¯¾ãã¦éä¿¡ããï¼ã
ã¯ã¼ã«ã¼ã¯ãã¹ã¿ã¼ã«å¯¾ã㦠Enlist
ã¡ãã»ã¼ã¸ãéä¿¡ãããã¨ã§ãã¸ã§ãã¸ã®åå ã表æããããã¹ã¿ã¼ã§ã¯Enlistã¡ãã»ã¼ã¸ã§åãåã£ã ActorRef
ãç¨ãã¦ç£è¦ããããã¸ã§ãçµäºæã«å
¨ã¦ã®ã¯ã¼ã«ã¼ãåæ¢ãããã§ããã
第15ç« ã¢ã¯ã¿ã¼ã®æ°¸ç¶å
akka-persistenceã¢ã¸ã¥ã¼ã«ã使ã£ã¦ãã¢ã¯ã¿ã¼ã®ç¶æ ãæ°¸ç¶åããæ¹æ³ã«ã¤ãã¦ãakka-persistenceã¢ã¸ã¥ã¼ã«ã使ãã¨ãã¯ã©ã¹ã¿ã¼ã®ãã¼ãã«é害ãèµ·ãããç½®ãæããããã¨ãã¦ãç¶ç¶ãã¦åä½ãããããªã¢ããªã±ã¼ã·ã§ã³ãæ§ç¯ã§ããã
ã¯ã©ã¹ã¿ã¼æ¡å¼µã®æ¹æ³ã¯äºã¤ããã
- ã¯ã©ã¹ã¿ã¼ã·ã³ã°ã«ãã³
- ã¯ã©ã¹ã¿ã¼ã·ã£ã¼ãã£ã³ã°
ã¤ãã³ãã½ã¼ã·ã³ã°
ã¤ãã³ãã½ã¼ã·ã³ã°ã¨ã¯
æåããå ¨ã¦ã®æä½ãã¤ãã³ãã¨ãã¦ã¸ã£ã¼ãã«ã«ä¿åããããã®ã¤ãã³ãåããæ示ãããæä½ãå®è¡ãããã¨ã§å¤ãå¾ãã
ã¢ã¯ã¿ã¼ã®ã¤ãã³ãã½ã¼ã·ã³ã°
ã¤ãã³ãã½ã¼ã·ã³ã°ãç¨ãããã¨ã«ãã大ããªã¡ãªããã¯ããã¼ã¿ãã¼ã¹ã¸ã®æ¸ãè¾¼ã¿ã¨ãã¼ã¿ãã¼ã¹ã¸ã®èªã¿è¾¼ã¿ãæ確ã«åé¢ã§ãããã¨ãã¢ã¯ã¿ã¼ãå復ããã¨ãã®ã¿ãã¸ã£ã¼ãã«ããèªã¿åããçºçããã
æ°¸ç¶ã¢ã¯ã¿ã¼
æ°¸ç¶ã¢ã¯ã¿ã¼ã¯1ï¼ã¤ãã³ãããç¶æ ãå復ãããã2ï¼ã³ãã³ããå¦çããããã®2ã¤ã®ããã¼ãã®ãã¡ã©ã¡ããã§åä½ããã
- ã¤ãã³ãï¼ã¢ã¯ã¿ã¼ãå¦çãæ£ããå®è¡ããã¨ãã証跡ãæ®ãããã®ãã®ã
- ã³ãã³ãï¼ã¢ã¯ã¿ã¼ã«å¦çãå®è¡ãããããã«éä¿¡ããã¡ãã»ã¼ã¸ã
æ°¸ç¶ã¢ã¯ã¿ã¼ã§ã¯ã¾ã以ä¸ã®ãã¨ãç¹å¾´ã¨ãªã
PersistentActor
ãã¬ã¤ããå©ç¨ããreceive
ã¡ã½ãããå®ç¾©ãã代ããã«ãreceiveCommand
ã¨receiveRecover
ã®2ã¤ã®ã¡ã½ãããå®ç¾©ããå¿ è¦ããããreceiveRecoverã§ã¯ãã¢ã¯ã¿ã¼ã®å復ä¸ã«éå»ã®ã¤ãã³ãã¨ã¹ãããã·ã§ãããåãåãããã«ä½¿ãã
persist
ã¡ã½ããã§ã³ãã³ããã¤ãã³ãã¨ãã¦æ°¸ç¶åãããäºçªç®ã«æ¸¡ããã¦ããå¼æ°ã¯ãæ°¸ç¶åãããã¤ãã³ããå¦çããé¢æ°ã§ãããä»åã®updateState
ã¯ã¢ã¯ã¿ã¼ã®è¨ç®çµæãæ´æ°ããã
val receiveCommand: Receive = { case Add(value) => persist(Added(value))(updateState) case Subtract(value) => persist(Subtracted(value))(updateState) case Divide(value) => if (value != 0) persist(Divided(value))(updateState) case Multiply(value) => persist(Multiplied(value))(updateState) case PrintResult => println(s"the result is: ${state.result}") case GetResult => sender() ! state.result case Clear => persist(Reset)(updateState) }
æ°¸ç¶åãããã¤ãã³ããå¦çããé¢æ°ã¯éåæã«å¼ã°ããããakka-persistenceã§ã¯ãã®é¢æ°ã®å¦çãå®äºããåã«æ¬¡ã®ã³ãã³ããå¦çãããªãããã«ããããã®ããã«ã¡ãã»ã¼ã¸ãããããèããããã«ãããã©ã¼ãã³ã¹ä¸ã®ãªã¼ãã¼ããããå¤å°ããã
receiveRecover
ã§ã¯ãã³ãã³ããæ£ããå¦çãããæã¨å
¨ãåãå¦çãå®è¡ããå¿
è¦ããããå復ãããã¨ãã¦ããã¢ã¯ã¿ã¼ã¨åã persistenceId
ã§ã¸ã£ã¼ãã«ã«ä¿åãããã¤ãã³ãã¯ä»¥åã¨åãçµæãå¾ãããã«å¦çããããã¢ã¯ã¿ã¼ãèµ·åãããåèµ·åããéã« receiveRecover
ã¡ã½ããã使ããã
val receiveRecover: Receive = { case event: Event => updateState(event) case RecoveryCompleted => log.info("Calculator recovery completed") }
ã¹ãããã·ã§ãã
ã¢ã¯ã¿ã¼ãå復ããã¾ã§ã®æéãçãããã«ã¯ã¹ãããã·ã§ãããå©ç¨ã§ãããã¹ãããã·ã§ããã¯å¥ã® SnapshotStore
ã«ä¿åããããã¢ã¯ã¿ã¼ã®å復æã«ã¯ãææ°ã®ã¹ãããã·ã§ããã渡ããããã®å¾ã«ãã®ã¹ãããã·ã§ãããä¿åãããæç¹ããçºçããã¤ãã³ãã渡ããããææ°ã®ã¹ãããã·ã§ãã以åã®ã¤ãã³ãã¯æ¸¡ããã¦ããªãã
æ°¸ç¶ã¯ã¨ãªã¼
æ°¸ç¶ã¢ã¯ã¿ã¼ã®å復å¦ç以å¤ã§ã¸ã£ã¼ãã«ãæ¤ç´¢ããããã®ã¢ã¸ã¥ã¼ã«ãæé©ãªã¦ã¼ã¹ã±ã¼ã¹ã¯ãæ°¸ç¶ã¢ã¯ã¿ã¼ããé£ç¶çã«ã¤ãã³ããèªã¿åããã¯ã¨ãªã¼ã«é©ããå½¢ã§å¥ã®ãã¼ã¿ãã¼ã¹ã«ä¿åãããã¨ãåºæ¬çã«ã¯2種é¡ã®ã¯ã¨ãªããããå
·ä½çã« eventsByPersistenceId
㨠currentEventsByPersistenceId
ã®éããè¦ã¦ã¿ãã
eventsByPersistenceId
:PersistenceActor
ãåãåã£ãã¤ãã³ãããã®é çªã«åå¾ããããã®ã¹ããªã¼ã ã¯çµäºãããã¨ã¯ãªããæ°ããã¢ã¯ã¿ã¼ãã¤ãã³ããåãåã£ãæã«ãã®ã¤ãã³ããpushãããcurrentEventsByPersistenceId
: åºæ¬çã«ã¯eventsByPersistenceId
ã¨åãæåããç¾å¨ã®ç¶æ ã¾ã§å°éããæç¹ã§ããã®ã¹ããªã¼ã ã¯çµäºããã
ã·ãªã¢ã©ã¤ãº
ç¬èªã®ã·ãªã¢ã©ã¤ã¶ã¼ãè¨å®ããã«ã¯ãè¨å®ãã¡ã¤ã«ããæå®ããã
akka { actor { serializers { basket = "aia.persistence.BasketEventSerializer" // ç¬èªã®ã·ãªã¢ã©ã¤ã¶ã¼ãç»é² basketSnapshot = "aia.persistence.BasketSnapshotSerializer" } serialization-bindings { "aia.persistence.Basket$Event" = basket // ã·ãªã¢ã©ã¤ãºãå¿ è¦ãªã¯ã©ã¹ã«ã·ãªã¢ã©ã¤ã¶ã¼ããã¤ã³ã "aia.persistence.Basket$Snapshot" = basketSnapshot } } }
ã¯ã©ã¹ã¿ã¼ã·ã³ã°ã«ãã³ã¨ã¯ã©ã¹ã¿ã¼ã·ã£ã¼ãã£ã³ã°
æ¦å¿µ
ã¯ã©ã¹ã¿ã¼ã·ã³ã°ã«ãã³: ã¢ã¯ã¿ã¼ã®ã¤ã³ã¹ã¿ã³ã¹ãAkkaã¯ã©ã¹ã¿ã¼å ã®åããã¼ã«ãæã¤ãã¼ãä¸ã§ãã ä¸ã¤ã ãåå¨ãããã¨ãä¿è¨¼ããä¸ã§ãã®ã¢ã¯ã¿ã¼ãå®è¡ã§ããããããã®ã¯ã©ã¹ã¿ã¼ã·ã³ã°ã«ãã³ãåæ¢ããå ´åãä»ã®ãã¼ãä¸ã§éå§ããã
ã·ã³ã°ã«ãã³ãã·ã£ã¼ãã£ã³ã°ã«ã¤ãã¦ä»çµã¿ãç解ãã¥ããã£ãã®ã§ãã¡ãã®è¨äºãèªãã ãé常ã«ããããããè¨äºã ã£ãã
Akka Clusterで超レジリエンスを手に入れる | Think IT(シンクイット)
Akka Clusterで超レジリエンスを手に入れる(その2) | Think IT(シンクイット)
ã¯ã©ã¹ã¿ã¼ã®ä¸ã«ãã¼ããããã¤ãåå¨ãã¦ãã¦ããã®ä¸ã§ããã«ã·ã£ã¼ããåããã¦ãããï¼ä¸è¨å³ã¯ Akka Clusterã§è¶ ã¬ã¸ãªã¨ã³ã¹ãæã«å ¥ããï¼ãã®2ï¼ããå¼ç¨ï¼
ShardRegion
ãã¡ãã»ã¼ã¸ãåãåããããã ShardCoordinator
ã«ã·ã£ã¼ãã®ä½ç½®ãèããShardCoordinator
ããã®åçãç¨ãã¦ãã¡ãã»ã¼ã¸ã¯ç®çã®ã·ã£ã¼ãã»ã¨ã³ãã£ãã£ã«å±ããããã
ShardCoordinator
ã¯ã¯ã©ã¹ã¿å
ã§1ã¤ã®ã¿åå¨ãã¹ãã§ãããAkkaã®ã¯ã©ã¹ã¿ã·ã³ã°ã«ãã³ã§å®è£
ããã¦ãããããã« ShardCoordinator
ã¯æ
å ±ã復æ§ã§ããå¿
è¦ãããã®ã§ã Akka Distributed Data ã使ç¨ãã¦ããã¼ã¿ã復å
ãããã¨ãã§ããããã«ãªã£ã¦ããã
Akkaå®è·µãã¤ãã«ã«ãããã·ã£ã¼ãã£ã³ã°
ShardShoppers
ãClusterSharding
ãèµ·åããShardRegion
ã¸ã®åç §ãåå¾ãããShardRegion
ã¯ã¡ãã»ã¼ã¸ãåãåã£ãæãã·ã£ã¼ãã§ããShardedShopper
ã«ã¡ãã»ã¼ã¸ã転éãããShardedShopper
ã¯èªèº«ã®ä¸æã¨ãªãentityIdã®æå®æ¹æ³ã¨ãã©ã®ã·ã£ã¼ãã«é ç½®ããããã®shardIdã®æå®æ¹æ³ãæ示ãããããã¯ClusterSharding
ãã¹ã¿ã¼ãããæã«æ¸¡ãããã©ãã«ä½ãé ç½®ããã®ããææ¡ããã®ã«å¿ è¦ã¨ãªãã
class ShardedShoppers extends Actor { ClusterSharding(context.system).start( ShardedShopper.shardName, ShardedShopper.props, ClusterShardingSettings(context.system), ShardedShopper.extractEntityId, // ã¢ã¯ã¿ã¼ãä¸æã«å®ããid ShardedShopper.extractShardId // ã©ã®ã·ã£ã¼ãã«ä½ç½®ããããå®ããid ) def shardedShopper = { ClusterSharding(context.system).shardRegion(ShardedShopper.shardName) // shardRegionã¸ã®åç §ãåå¾ } def receive = { case cmd: Shopper.Command => shardedShopper forward cmd } }