ã¯ããã«
ããã«ã¡ã¯ããªã¼ãã³ã¨ã¤ãã®å±±å´ã§ãã
ä»åã¯ãAkka Streams 㧠Alpakka ã使ã£ã¦ AWS SQS ã¨æ¥ç¶ããã¼ã¿ãéåä¿¡ããæ¹æ³ã«ã¤ãã¦èª¬æãã¾ããã¾ãæ¬è¨äºå ã§ã¯ AWS SQS ããã¼ã«ã«ç°å¢ã§ã·ãã¥ã¬ã¼ãããããã« ElasticMQ ã使ç¨ãã¾ããElasticMQ ã®ã»ããã¢ããã«ã¤ãã¦ããããã¦ç°¡åã«èª¬æãã¾ããè¨äºå·çæã®ç°å¢ã¯ä»¥ä¸ã®éãã§ãã
- Scala 2.13.5
- sbt 1.4.7
- Akka 2.6.13
- Alpakka SQS 2.0.2
- ElasticMQ 1.1.0
ãªã¼ãã³ã¨ã¤ãã§ã¯ãSNS é ä¿¡å¹æ測å®ãµã¼ãã¹ Insight BRAIN ã«ããã¦å é¨ã®ã¸ã§ã管çã« SQS ãå©ç¨ãã¦ãã¾ããããã®æ¥ç¶å¦çã«ããã¦å®éã« Akka Streams 㨠Alpakka ã使ç¨ãã¦ãã¾ãã
Alpakka
Alpakka ã¨ã¯
Akka Streams ã§æ§ã ãªãªã½ã¼ã¹ã¨æ¥ç¶ããããã®ã³ã³ãã¼ãã³ã群ãæä¾ããããã¸ã§ã¯ãã Alpakka ã§ããAkka ã®å ¬å¼ããã¸ã§ã¯ãã¨ãã¦éçºã»æä¾ããã¦ãã¾ãããã¨ãã°ãã¼ã«ã«ã®ããã¹ããã¡ã¤ã«ãã¹ããªã¼ã å¦çãã Source ããå¦ççµæã S3 ã«ä¿åãã Flow ãªã©ãæ§ã ãªå ¥åã½ã¼ã¹ãåºåå ã Akka Streams ã®ã³ã³ãã¼ãã³ãã¨ãã¦æ±ç¨çã«ä½¿ããããã¡ã«ãã¦ããã¦ãã¾ãã
Alpakka Documentation
https://doc.akka.io/docs/alpakka/current/
å¤é¨ãµã¼ãã¹ã¨ã®é£æºå¦çãç°¡åã«æ§ç¯ã§ãã¦ãã¾ãã®ã§ãã¾ã Alpakka ãæ´»ç¨ã§ããã°éçºå¹çã®åä¸ãè¦è¾¼ããã§ãããã
ä»åã¯ãã®ä¸ãã AWS SQS ç¨ã®ã¢ã¸ã¥ã¼ã«ã使ç¨ãã¾ãã
AWS SQS ⢠Alpakka Documentation
https://doc.akka.io/docs/alpakka/current/sqs.html
ã¡ãªã¿ã«ãAlpakkaãã¨ããååã¯åç©ã®ã¢ã«ãã«ããåã£ã¦ãããã®ã¨æããã¾ããã¢ã«ãã«ã¯è±èªã®ç¶´ãã ã¨ãAlpacaãã§ãããã£ã³ã©ã³ãããã«ã¦ã§ã¼ã§ã¯ãAlpakkaãã¨è¡¨è¨ããããã§ãããAkkaãã¨ããåèªãå«ã¾ãã¦ãããããã®ååãæ¡ç¨ãããã®ããªã¨æãã®ã§ãããããã¥ã¡ã³ãã§ããã®ãããã¯ç¹ã«èª¬æããã¦ãªããããªã®ã§æ£ç¢ºãªã¨ããã¯ãããã¾ãããâ¦ã
ã»ããã¢ãã
Alpakka ã¯æ¥ç¶å¯¾è±¡ãã¨ã«ã¢ã¸ã¥ã¼ã«ãåããã¦ããã®ã§å¿
è¦ãªãã®ã ããé¸ãã§å°å
¥ãã¾ãããã¨ãã°ä»å㯠SQS ã¢ã¸ã¥ã¼ã«ãå¿
è¦ãªã®ã§ libraryDependencies
ã以ä¸ã®ããã«è¨è¿°ãã¾ãã
libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream" % "2.6.13", "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "2.0.2" )
å®éã®ã³ã¼ãã«ã¤ãã¦ã¯å¾è¿°ãã¾ãã
AWS SQS
AWS SQS ã¨ã¯
Amazon SQSï¼ãµã¼ãã¼ã¬ã¹ã¢ããªã®ããã®ã¡ãã»ã¼ã¸ãã¥ã¼ãµã¼ãã¹ï¼| AWS
https://aws.amazon.com/jp/sqs/
SQS (Simple Queue Service) 㯠AWS ãæä¾ããããã¼ã¸ãåã®ã¡ãã»ã¼ã¸ãã¥ã¼ãµã¼ãã¹ã§ããAWS ã§ã¯ MQãKinesisãMSK ãªã©è¤æ°ã®ã¡ãã»ã¼ã¸ãã¥ã¼ç³»ãµã¼ãã¹ãæä¾ããã¦ãã¾ãããååã®éãæãã·ã³ãã«ãªæ©è½ãæä¾ãã¦ããã®ã SQS ã§ãããããã©ã¤ã¨ã¿ãªãªãµã¼ãã¹ã§ãã AWS ã«ããã¯ã¤ã³ãããç¹ã¯ããã¯ã§ãããä½ã³ã¹ãã§å©ç¨ã§ããä¸ã«åç´ãªã¦ã¼ã¹ã±ã¼ã¹ãªãå¤§ä½ SQS ã§ã«ãã¼ã§ããã®ã§ã¯ã¨æãã¾ãã
ElasticMQ ã®ã»ããã¢ãã
SQS 㯠AWS ã®ãµã¼ãã¹ãªã®ã§ãéçºç¨ã«ãã¼ã«ã«ç°å¢ã§åä½ãããã¨ãã£ããã¨ãã§ããªãã®ã§ãããSQS äºæ API ãæä¾ãã ElasticMQ ã¨ããã¢ããªã±ã¼ã·ã§ã³ãå©ç¨ãããã¨ã§æ¬ä¼¼çã«ãã¼ã«ã«ç°å¢ã§ SQS ãåä½ããããã¨ãã§ãã¾ãã
softwaremill/elasticmq: In-memory message queue with an Amazon SQS-compatible interface. Runs stand-alone or embedded.
https://github.com/softwaremill/elasticmq
ãã¥ã¼ã®ãã¼ã¿ã¯ã¤ã³ã¡ã¢ãªã§æ±ãããããæ°¸ç¶åããã ElasticMQ ãµã¼ãã¹ãçµäºããã¨æ¶ãã¦ãã¾ãã¾ãããªã®ã§ããã¾ã§ãéçºç¨ã®ãã¼ã«ã¨ããä½ç½®ã¥ãã§æãã¦ããã®ãããããã§ãã
ElasticMQ 㯠Alpakka SQS ã®ãã¹ãã§ãå©ç¨ããã¦ããããã§ããæã ãéçºç°å¢ã§å©ç¨ãã¦ãã¾ãããããã¾ã§ã®ã¨ããå®éã® SQS ã¨æåãç°ãªã£ã¦å°ã£ãã¨ãããããªãã¨ã¯èµ·ãã¦ãã¾ããã
ã¡ãªã¿ã« ElasticMQ èªä½ã¯ Scala 製ã§ãããã ã Docker ã¤ã¡ã¼ã¸ã§ãæä¾ããã¦ããããç¹ã« Scala ãæèããã«å©ç¨ã§ãã¾ããJar åä½ã§å®è¡ãããã©ã¤ãã©ãªã¨ãã¦ã¢ããªã±ã¼ã·ã§ã³ã«çµã¿è¾¼ãã ãæ§ã ãªå©ç¨æ¹æ³ãæä¾ããã¦ãã¾ãããä»å㯠Docker ã¤ã¡ã¼ã¸ãå©ç¨ãããã¨ã«ãã¾ãã
docker-compose.yml
ãä½æã以ä¸ã®ããã«è¨è¿°ãã¾ãã
version: '3' services: elasticmq: image: softwaremill/elasticmq container_name: elasticmq_test ports: - 9324:9324 volumes: - ./elasticmq.conf:/opt/elasticmq.conf
è¨å®ãã¡ã¤ã« elasticmq.conf
ã以ä¸ã®ããã«ä½æãã¾ããä»å㯠test-queue.fifo
ã¨ããåå㧠FIFO ãã¥ã¼ãä½æãã¾ãã
include classpath("application.conf") aws { accountId = queue } queues { "test-queue.fifo" { defaultVisibilityTimeout = 10 seconds fifo = true contentBasedDeduplication = true } }
ä¸è¨ãã¡ã¤ã«ãç¨æã§ããã docker-compose
ã³ãã³ãã§ã³ã³ãããèµ·åãã¾ãã
docker-compose up -d
ã³ã³ãããèµ·åãã㨠test-queue.fifo
ã¨ãããã¥ã¼ãèªåçã«ä½æãã localhost:9324
ã§æ¥ç¶å¯è½ãªç¶æ
ã«ãªãã¾ãããã®ã¨ã³ããã¤ã³ããæå®ããã° AWS CLI ãªã©ã使ã£ã¦ SQS ã¨ãã¦æ±ããã¨ãå¯è½ã§ãã
# AWS CLI ã§ãã¥ã¼ããªã¹ããã¦ã¿ã aws sqs list-queues --endpoint-url 'http://localhost:9324'
{ "QueueUrls": [ "http://localhost:9324/queue/test-queue.fifo" ] }
ã¡ãªã¿ã«ããããããã°ã ã¨æãã®ã§ãããç¾è¡ãã¼ã¸ã§ã³ã ã¨æ¥ç¶ URL ã® AWS ã¢ã«ã¦ã³ãã®é¨å㯠queue
ã¨ããæååã§ãªãã¨ãã¾ãåä½ããªãããã§ãããã¨ãã° elasticmq.conf
㧠aws.accountId = aws01
ã®ããã«è¨è¿°ããã° http://localhost:9324/aws01/test-queue.fifo
ã§æ¥ç¶ã§ãããã«æããã®ã§ããæå
ã®ç°å¢ã§ã¯æ£ããåä½ãã¾ããã§ããã
Alpakka 㧠AWS SQS æ¥ç¶
æºåãæ´ã£ãã®ã§å®éã« Alpakka 㧠SQS (ElasticMQ) ã«æ¥ç¶ãã¦ã¿ã¾ãã
ã¾ãæåã« SqsAsyncClient
ãçæã㦠SQS æ¥ç¶æ
å ±ãè¨å®ãã¾ãã
import java.net.URI import akka.actor.ActorSystem import akka.stream.alpakka.sqs.SqsSourceSettings import com.github.matsluni.akkahttpspi.AkkaHttpClient import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.sqs.SqsAsyncClient val endpoint = "http://localhost:9324" implicit val actorSystem = ActorSystem("example") implicit val sqsClient = SqsAsyncClient .builder() .credentialsProvider( StaticCredentialsProvider.create( AwsBasicCredentials.create("AK", "SK") // (1) ) ) .endpointOverride(URI.create(endpoint)) // (2) .region(Region.AP_NORTHEAST_1) .httpClient(AkkaHttpClient.builder() .withActorSystem(actorSystem).build()) .build()
(1) ä»åã¯æ¬ç©ã® SQS ã§ã¯ãªããã¼ã«ã«ã® ElasticMQ ã¸ã®æ¥ç¶ãªã®ã§ã¢ã¯ã»ã¹ãã¼ã¨ã·ã¼ã¯ã¬ãããã¼ã¯é©å½ãªå¤ãè¨è¿°ãã¦ããã¾ãã
(2) endpointOverride
ã§ã¨ã³ããã¤ã³ããæå®ãããã¨ã§æ¥ç¶å
ã AWS ã§ã¯ãªããã¼ã«ã«ã«åãããã¨ãã§ãã¾ãã
Subscribe
SQS ããã¡ãã»ã¼ã¸ãã¹ããªã¼ã åå¾ããã«ã¯ SqsSource
ã使ç¨ãã¾ãããã®ã¨ãå
ç¨çæãã SqsAsyncClient
ã implicit
ã§åç
§ããã¾ãããã® SqsSource
ã¯ãã®åã®éã Akka Streams ã® Source ãªã®ã§ Flow ã Sink ã¨ã¤ãªãã§ãã¼ã¿ããã¼ãæ§ç¯ã§ãã¾ãã
val queueUrl = endpoint + "/queue/test-queue.fifo" val settings = SqsSourceSettings() SqsSource(queueUrl, settings) .runWith(Sink.foreach { message: Message => val body = message.body println(s"received: ${body}") })
è¨å®çã«åé¡ããªããã°ãã㧠SQS ã¨æ¥ç¶ãããç¶æ ã«ãªãã¾ãã試ãã«ãã®ã¢ããªã±ã¼ã·ã§ã³ãå®è¡ããç¶æ 㧠AWS CLI ã使ã£ã¦ã¡ãã»ã¼ã¸ãéä¿¡ãã¦ã¿ã¾ããã°ã«ã¼ã ID ãå¿ è¦ã«ãªãã®ã§ããã¨ããããé©å½ãªå¤ã§å¤§ä¸å¤«ã§ãã
# AWS CLI 㧠"hello" ã¨ããã¡ãã»ã¼ã¸ãéä¿¡ãã¦ã¿ã aws sqs send-message \ --queue-url "http://localhost:9324/queue/test-queue.fifo" \ --endpoint-url "http://localhost:9324" \ --message-group-id "x" \ --message-body "hello"
æåãã㨠Scala ã¢ããªã±ã¼ã·ã§ã³å´ã®ã³ã³ã½ã¼ã«ã« received: hello
ã¨è¡¨ç¤ºãããã¯ãã§ãã
ã¡ãªã¿ã«ãã®ã¾ã¾ã 㨠SQS ä¸ã®ã¡ãã»ã¼ã¸ã¯æ¶ããã«æ®ãç¶ãã¾ãããªã®ã§å°ãæéãçµã¤ã¨ã¾ãåãã¡ãã»ã¼ã¸ãåä¿¡ãã¦ãã¾ãã¾ããä¸åº¦åä¿¡ããå¦çæ¸ã¿ã®ã¡ãã»ã¼ã¸ãåé¤ããã«ã¯ ACK ã¡ãã»ã¼ã¸ãéã£ã¦ã¡ãã»ã¼ã¸ãåé¤ããå¿
è¦ãããã¾ãããã®å ´åã以ä¸ã®ããã« SqsAckFlow
ã SqsAckSink
ã使ç¨ãã¾ãã
import akka.stream.alpakka.sqs.MessageAction import akka.stream.alpakka.sqs.SqsAckResult SqsSource(queueUrl, settings) .map(MessageAction.Delete(_)) .via(SqsAckFlow(queueUrl)) .runWith(Sink.foreach { res: SqsAckResult => val body = res.messageAction.message.body println(s"received: ${body}") })
ããã§ä¸åº¦åä¿¡ããã¡ãã»ã¼ã¸ã¯ SQS ä¸ããåé¤ããã¾ãã
Publish
SQS ã«ã¡ãã»ã¼ã¸ãéä¿¡ããå ´å㯠SqsPublishSink
ã SqsPublishFlow
ã使ãã¾ãã
import akka.stream.alpakka.sqs.scaladsl.SqsPublishSink import akka.stream.scaladsl.Source import software.amazon.awssdk.services.sqs.model.SendMessageRequest val queueUrl = endpoint + "/queue/test-queue.fifo" val message = SendMessageRequest.builder .messageBody("hello") .messageGroupId("x") .build Source.single(message) .runWith(SqsPublishSink.messageSink(queueUrl))
ãããå®è¡ãã㨠SQS ã«ã¡ãã»ã¼ã¸ã 1 件éä¿¡ããã¾ãã
AWS CLI ã§ç¢ºèªãããã¨ãã§ãã¾ãã
# AWS CLI ã§ãã¥ã¼ããã¡ãã»ã¼ã¸ãåå¾ãã¦ã¿ã aws sqs receive-message \ --queue-url "http://localhost:9324/queue/test-queue.fifo" \ --endpoint-url "http://localhost:9324"
{ "Messages": [ { "MessageId": "5be5527a-ec4e-4f3f-8ad2-67a713938673", "ReceiptHandle": "5be5527a-ec4e-4f3f-8ad2-67a713938673#2406b392-60b7-4019-b3b9-4a62620bdbcc", "MD5OfBody": "5d41402abc4b2a76b9719d911017c592", "Body": "hello" } ] }
ãããã«
Akka Streams ã¯ãã¸ãã¯ã SourceãFlowãSink ã¨ãã£ãããã¡ã§ã¢ã¸ã¥ã¼ã«åãã¦æ±ããã¨ãã§ããããã¢ã¸ã¥ã¼ã«ã®çµã¿åããã§ãã¼ã¿ããã¼ãæ§ç¯ã§ããç¹ãé常ã«å¼·åã ã¨æãã¦ãã¾ããAlpakka 㯠SQS æ¥ç¶ä»¥å¤ã«ãæ§ã ãªã¢ã¸ã¥ã¼ã«ãæä¾ãã¦ããã®ã§ãè¦ä»¶ããããããã°å¤é¨ãµã¼ãã¹ã¨ã®ãã¼ã¿é£æºé¨å㯠Alpakka ã§æ軽ã«æ§ç¯ãã¦ãã¾ããã¨ãã§ãã¾ãããã®åãã¸ãã¹ãã¸ãã¯ã®éçºã«éä¸ã§ããã®ã§ããã¾ãæ´»ç¨ãããã¨ã§éçºå¹çã®åä¸ãå³ããã®ã§ã¯ãªãã§ããããã
ã¨ãããã¨ã§ãä»åã¯ä»¥ä¸ã¨ãªãã¾ããé常ã«é§ã足ã®å 容ã§ã¯ããã¾ããæ¬è¨äºã Akka Streams æ´»ç¨ã®ä¸å©ã¨ãªãã°å¹¸ãã§ããæå¾ã¾ã§ãèªã¿ããã ããããã¨ããããã¾ããã
Akkaå®è·µãã¤ãã« ã¢ã¯ã¿ã¼ã¢ãã«ã«ãã並è¡ã»åæ£ã·ã¹ãã ã®å®ç¾
- ä½è :Raymond Roestenburg,Rob Bakker,Rob Williams
- çºå£²æ¥: 2017/12/13
- ã¡ãã£ã¢: åè¡æ¬ï¼ã½ããã«ãã¼ï¼
Scalaã¹ã±ã¼ã©ãã«ããã°ã©ãã³ã°ç¬¬3ç
- ä½è :Martin Odersky,Lex Spoon,Bill Venners
- çºå£²æ¥: 2016/09/20
- ã¡ãã£ã¢: åè¡æ¬ï¼ã½ããã«ãã¼ï¼