Red Hat 㧠Solution Architect ã¨ã㦠Quarkus ãæ å½ãã¦ããä¼è¤ã¡ã²ãï¼@chiroitoï¼ã§ãã
ãã®è¨äºã¯ãQuarkus.io ã®ããã°è¨äºãCombining Apache Kafka and the Rest client ã®ç¿»è¨³è¨äºã§ãã
ä»é±ãã¾ãé¢ç½ã質åãããã¾ãããä»é±ã¯ãKafka 㨠Rest Client ã®çµã¿åããã«ã¤ãã¦èª°ãã«èããã¾ãããããã¯ç¹°ãè¿ãã«ãªãã¾ãããã»ã¨ãã©ã®å ´åã次ã®ãããªããã»ã¹ãç®æ¨ã«ãã¦ãã¾ãã
ã¤ã¾ããåä¿¡ãã Kafka ã¡ãã»ã¼ã¸ãã¨ã«ãªã¢ã¼ããµã¼ãã¹ãå¼ã³åºãããã®ã§ãããã®ãããèªã¿åããã¦ãããã¼ã¿ãå«ãæåã®ãããã¯ï¼ãinãï¼ãä¾ãã°ããã©ã³ã¶ã¯ã·ã§ã³ããããã¾ãã次ã«ãã¢ã¼ããã¯ãã£ã®ä¸å¿çãªé¨åãããã¾ããããã¯å¦çã³ã³ãã¼ãã³ãï¼å³ã®Processing Compornentï¼ã§ããããã¯ãçä¿¡ãããã©ã³ã¶ã¯ã·ã§ã³ãèªã¿åãããããã®ããããã«ã¤ãã¦ããªã¢ã¼ããµã¼ãã¹ãå¼ã³åºãã¾ããã¾ããããã¯ï¼ãªã¢ã¼ããµã¼ãã¹ã«ãã£ã¦çæãããï¼ã¬ã¹ãã³ã¹ãå¥ã® Kafka ããã㯠"out" ã«æ¸ãè¾¼ã¿ã¾ãã
Quarkus ã§ãããå®è£ ããã®ã¯ç°¡åã§ãããã®è¨äºã§ã¯ããã«ã¤ãã¦èª¬æãã¾ããReactive Messaging 㨠Rest Client ã®ãããã§ãããã«ã¯ 20 è¡ä»¥ä¸ã®ã³ã¼ããå¿ è¦ã¨ãã¾ããã
ãªã¢ã¼ããµã¼ãã¹
ã¾ãã¯ãªã¢ã¼ããµã¼ãã¹ããå§ãã¦ã¿ã¾ããããQuarkus ã«ã¯ããªã¢ã¼ã HTTP ãµã¼ãã¹ãå¼ã³åºãããã®è¤æ°ã®æ¹æ³ãããã¾ããã§ãããHTTP ã®ä½ã¬ãã«ã®è©³ç´°ãå¦çãããã¨ãªã HTTP ãµã¼ãã¹ã¨å¯¾è©±ããåªããæ¹æ³ãæä¾ãã¦ããã®ã§ãRest Client ã使ç¨ãã¦ã¿ã¾ãããã
ã¿ãªãã㯠HTTP API ã使ããã¨ãã§ãã¾ããã§ãããã·ã³ãã«ã«è¨ãã¨ãç°¡åãªãªã¢ã¼ããµã¼ãã¹ãèãã¦ã¿ã¾ãããã次ã®ããã«ãªãã¾ãã
@RegisterRestClient(configKey = "transaction-service") @Produces(MediaType.APPLICATION\_JSON) @Consumes(MediaType.APPLICATION\_JSON) public interface TransactionService { @Path("/transactions") @POST TransactionResult postSync(Transaction transaction); @Path("/transactions") @POST Uni<TransactionResult> postAsync(Transaction transaction); }
ãã®ãµã¼ãã¹ã«ã¯ãåã HTTP ã¨ã³ããã¤ã³ããå¼ã³åºã 2 ã¤ã®ã¡ã½ãããå«ã¾ãã¦ãã¾ããæåã®ãã®ã¯åæçãªãã®ã§ãããã®ãããå¿çãåãåãã¾ã§å¼ã³åºãå
ã®ã¹ã¬ããããããã¯ãã¾ãã2ã¤ç®ã¯éåæã§ããè¿ããã Uni
ã¯åä¿¡æã«ã¬ã¹ãã³ã¹ãåå¾ãã¾ãããã®å ´åãå¼ã³åºãå
ã®ã¹ã¬ããã¯ãããã¯ããã¾ããããã®ãããä»ã®ãã¨ãã§ãã¾ãããããã®ã¡ã½ããã®ä½¿ãæ¹ã¯å¾ã»ã©è¦ã¦ããã¾ããã¾ãã¯ããã®åã«å°ãè¨å®ãè¦ã¾ãããã application.properties
ã«è¿½å ãã¾ãã
# Configure the transaction-service (rest client) transaction-service/mp-rest/url=http://localhost:8080
ãã¡ãããURLãæ´æ°ãã¾ããhttps://quarkus.io/guides/rest-client ã§ã¯ãRest Client ã®ä½¿ç¨æ¹æ³ãè¨å®ã«ã¤ãã¦è©³ãã説æãã¦ãã¾ãã
çä¿¡ãã©ã³ã¶ã¯ã·ã§ã³ãã¨ã«ãµã¼ãã¹ãå¼ã³åºã
OKãç§ãã¡ã¯ãµã¼ãã¹ãå¼ã³åºããã¨ã¯ã§ãã¾ããã§ãããè¦ãã¦ããã¦ä¸ãããç§ãã¡ã¯ãããçä¿¡ãã©ã³ã¶ã¯ã·ã§ã³ãã¨ã«å¼ã³åºãããã®ã§ããããã¦ãããã®ãã©ã³ã¶ã¯ã·ã§ã³ã¯ Kafka ã®ãããã¯ããæ¥ã¾ããReactive Messaging ã使ãã°ãä»ããã« Kafka ã«å¯¾å¦ããå¿
è¦ã¯ããã¾ãããç§ãã¡ã¯ãã¸ãã¯ã«éä¸ã§ãã¾ããç§ãã¡ã¯ãã£ãã«ï¼ãã¼ã¿ã®ã¹ããªã¼ã ï¼ãæã£ã¦ããã¨ãã¾ããããããã¯ãã©ã³ã¶ã¯ã·ã§ã³ã転éããããã®ç©ã§ãããããæåã®ãã£ã³ãã« in
ã¨å¼ã³ã¾ãã
ã¾ããç§ãã¡ã¯ãªã¢ã¼ããµã¼ãã¹ããã®ã¬ã¹ãã³ã¹ãå¥ã® Kafka ãããã¯ã«æ¸ãè¾¼ã¿ããã¨æãã¾ããå度è¨ãã¾ãããKafka ã«å¯¾å¦ããå¿
è¦ã¯ããã¾ãããã¬ã¹ãã³ã¹ã out
ã¨ããååã®ãã£ã³ãã«ã«æ¸ãè¾¼ãã ã¨ãã¾ãã
ããã§ã以ä¸ã®ãããªï¼ä¸å®å ¨ãªï¼ã³ã¼ããããã¾ãï¼
@ApplicationScoped public class TransactionProcessor { @Incoming("in") // æåã®ãã£ãã« - ç§ãã¡ã¯ããããèªã¿è¾¼ã @Outgoing("out") // 2 ã¤ç®ã®ãã£ãã« - ç§ãã¡ã¯ããã¸æ¸è¾¼ã public TransactionResult sendToTransactionService(Transaction transaction) { // ããã§ç§ãã¡ã®ãµã¼ãã¹ãèªã¿åºãå¿ è¦ããã } }
@Incoming
ã¯èªã¿è¾¼ã¿ãã£ã³ãã«ãè¨å®ãã¾ãã@Outgoing
ã¯æ¸ãè¾¼ã¿ãã£ã³ãã«ãè¨å®ãã¾ããããããä½ãã足ããªãã§ã...ã ç§ãã¡ã¯ãªã¢ã¼ããµã¼ãã¹ãå¼ã³åºãå¿
è¦ãããã¾ã
@ApplicationScoped public class TransactionProcessor { private static final Logger LOGGER = Logger.getLogger("TransactionProcessor"); @Inject @RestClient TransactionService service; @Incoming("in") @Outgoing("out") @Blocking public TransactionResult sendToTransactionService(Transaction transaction) { LOGGER.infof("Sending %s transaction service", transaction); return service.postSync(transaction); } }
ã¾ãæåã«ãRest Client ãæ³¨å ¥ãã¾ããããã¦ãç§ãã¡ã®ã¡ã½ããã®ä¸ã§ãããå¼ã³åºãã ãã§ãã
@Blocking
ã«ã¤ãã¦çåã«æããã¨ãããããããã¾ããããªã¢ã¯ãã£ãã¡ãã»ã¼ã¸ã³ã°ã§ã¯ãããããã³ã°ã³ã¼ãã使ç¨ãã¦ããå ´åã¯ãã®æ¨ã示ãå¿
è¦ãããã¾ããããã©ã«ãã§ã¯ããã¯ã¤ãã³ãã«ã¼ãã¢ã¼ããã¯ãã£ã使ç¨ããã¦ããããã§ãã便å©ãªåé¢ã@Blocking
ãä¹±ç¨ãã¹ãã§ã¯ããã¾ãããããã¯ãã¹ã¬ãããã¼ã«ã«ä¾åãã¦åæå®è¡æ§ãå¶éãããããã§ããããããããã¯ããªãã®ãã¸ãã¯ãåæããã¾ãã
éåææä½ã®ä½¿ç¨
TransactionService
ãå®è¡ãã 2 çªç®ã®ã¡ã½ãã postAsync
ã使ç¨ãããã¨ã§ãç§ãã¡ã¯ @Blocking
ã¢ããã¼ã·ã§ã³ãåãé¤ãã¾ãã
@ApplicationScoped public class TransactionProcessor { private static final Logger LOGGER = Logger.getLogger("TransactionProcessor"); @Inject @RestClient TransactionService service; @Incoming("in") @Outgoing("out") public Uni<TransactionResult> sendToTransactionService(Transaction transaction) { LOGGER.infof("Sending %s transaction service", transaction); return service.postAsync(transaction); } }
post
ã¡ã½ããã® async ããªã¢ã³ãã使ç¨ããã¨ã@Blocking
ãåé¤ã§ãã¾ãã Uni
ãç´æ¥è¿ãã¾ãããã® Uni
ã¯ãªã¢ã¼ããµã¼ãã¹ã®å¿çãåä¿¡ããã¨ããã®å¤ã out
ãã£ãã«ã«æ¸ãè¾¼ã¿ã¾ãã
ãã£ãã«ã Kafka ã«ãããã³ã°
ããã¾ã§ã¯é 調ã§ããç§ãã¡ã®ã³ã¼ãã Kafka ã«æ¥ç¶ããæãæ¥ã¾ããããªã¢ã¯ãã£ãã¡ãã»ã¼ã¸ã³ã°ã§ã¯ããã£ãã«ãã³ãã¯ã¿ã«ãããã³ã°ãã¾ããããã§ã¯ Kafka ã§ããããã§ãin
ãã£ã³ãã«ã¨ out
ãã£ã³ãã«ã Kafka ã®ãããã¯ã§ãããã¨ã示ãããã«ã¢ããªã±ã¼ã·ã§ã³ãè¨å®ããå¿
è¦ãããã¾ããããä¸åº¦ãapplication.properties
ãã¡ã¤ã«ãç·¨éãã追å ãã¾ãã
mp.messaging.incoming.in.connector=smallrye-kafka mp.messaging.incoming.in.topic=transactions mp.messaging.incoming.in.value.deserializer=org.acme.model.TransactionDeserializer mp.messaging.incoming.in.auto.offset.reset=earliest mp.messaging.incoming.in.enable.auto.commit=false mp.messaging.outgoing.out.connector=smallrye-kafka mp.messaging.outgoing.out.topic=output mp.messaging.outgoing.out.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
æåã®ãããã¯ã¯ in
ãã£ã³ãã«ã«ã¤ãã¦ã§ãã ãã㯠transactions
Kafkaã®ãããã¯ã«æ¥ç¶ãã¾ãããã¼ã¿ã¯ã«ã¹ã¿ã ãã·ãªã¢ã©ã¤ã¶ã§ãã·ãªã¢ã©ã¤ãºããã¾ããæå¾ã®ä»ã®ããããã£ã¯ããªã¼ãã³ãããï¼Reactive Messaging ã代ããã«ã³ããããå¦çãã¦ããã¾ãï¼ãç¡å¹ã«ãã¾ããããã¦ãæå¾ã«ã³ãããããããªãã»ãã以éã®ãã¼ã¿ãèªã¿è¾¼ã¿ã¾ãã
2 çªç®ã®ãããã¯ã§ã¯ãout
ãã£ãã«ãè¨å®ãã¾ãã output
Kafkaãããã¯ã¨æ¥ç¶ãã¾ããããã¦ãå¤ã®ã·ãªã¢ã©ã¤ã¶ãè¨å®ãã¾ãããã®ç°¡åãªä¾ã§ã¯ããã¼ã¿ãJSONã§æ¸ãã¾ãã
ãã©ã³ã¶ã¯ã·ã§ã³ã Kafka transaction
ãããã¯ã«æ¸ãè¾¼ã¾ããã¨ãããã¯ç§ãã¡ã®å¦çã³ã³ãã¼ãã³ãã§åä¿¡ããã¾ããããã¦ããªã¢ã¼ããµã¼ãã¹ã«éä¿¡ããããã®çµæã output
Kafka ãããã¯ã«æ¸ãè¾¼ã¾ãã¾ãã
2020-08-27 10:04:44,141 INFO [TransactionProcessor] (vert.x-eventloop-thread-0) Sending Transaction{name='MacroHard', amount=20} transaction service 2020-08-27 10:04:44,196 INFO [TransactionResource] (executor-thread-2) Handling transaction MacroHard / 20 2020-08-27 10:04:44,240 INFO [TransactionProcessor] (vert.x-eventloop-thread-0) Sending Transaction{name='BlueHat', amount=10} transaction service 2020-08-27 10:04:44,245 INFO [TransactionResource] (executor-thread-2) Handling transaction BlueHat / 10
output
ãããã¯ã®ä¸ãè¦ã¦ã¿ãã¨ãTransactionResult
ãæµãã¦ããã®ã確èªã§ãã¾ãã
çµããã¾ããï¼
æ°è¡ã®ã³ã¼ãã¨å°ãã®è¨å®ã§ãKafka ãããã¯ãããã¼ã¿ãèªã¿åãããªã¢ã¼ããµã¼ãã¹ãå¼ã³åºãããã®çµæãå¥ã® Kafka ãããã¯ã«æ¸ãè¾¼ãã¾ããåç´ãªè©±ã§ãã
èªåã§ãã£ã¦ã¿ããã¨æãã¾ããï¼ãã® GitHub ãªãã¸ããªã®ã³ã¼ãããã§ãã¯ã¢ã¦ããã¦ãreadme ã®æ示ã«å¾ã£ã¦ãã ããã
Reactive Messaging 㨠Rest ã¯ã©ã¤ã¢ã³ãã«ã¯ä»ã«ãå®ç³ãå«ã¾ãã¦ãã¾ããé¢é£ããã¬ã¤ããããã¥ã¡ã³ãã確èªãã¦è©³ç´°ã確èªãã¦ãã ããã