Red Hat 㧠Solution Architect ã¨ã㦠Quarkus ãæ å½ãã¦ããä¼è¤ã¡ã²ãï¼@chiroitoï¼ã§ãã
ãã®è¨äºã¯ãQuarkus.io ã®ããã°è¨äºãHow to Use Kafka, Schema Registry and Avro with Quarkus ã®ç¿»è¨³è¨äºã§ãã
Kafka ã®ä¸çã§ã¯ãApache Avro ã¯å§åçã«æã使ç¨ããã¦ããã·ãªã¢ã©ã¤ã¼ã¼ã·ã§ã³ãããã³ã«ã§ããAvro ã¯ãã¼ã¿ã·ãªã¢ã©ã¤ãºã·ã¹ãã ã§ããKafka ã¨çµã¿åããããã¨ã§ãã¹ãã¼ããã¼ã¹ã®å ç¢ã§é«éãªãã¤ããªã·ãªã¢ã©ã¤ãºãå®ç¾ãã¾ãã
ãã®ããã°è¨äºã§ã¯ãQuarkus ã¢ããªã±ã¼ã·ã§ã³ã§ã¹ãã¼ãã¬ã¸ã¹ããªã使ç¨ã㦠Avro ã使ç¨ããæ¹æ³ãè¦ã¦ããã¾ãããã®ããã°ã§ã¯ãJVM ã¢ã¼ããä¸å¿ã«ç´¹ä»ãã¦ãã¾ãããã¤ãã£ãã¢ã¼ãã«ã¤ãã¦ã¯å¥ã®è¨äºã§åãä¸ãã¾ãã
ç§ãã¡ã¯ç°¡åãªã¢ããªã±ã¼ã·ã§ã³ãæ¸ãã¦ããã¾ãããã㯠HTTP ãªã¯ã¨ã¹ããåä¿¡ãã¦ãKafka ã«ãã¤ãã¼ããæ¸ãè¾¼ãã§ãKafka ããèªã¿åºãã¨ããã¢ããªã§ããç°¡åã«è¨ãã¨ãåãã¢ããªã±ã¼ã·ã§ã³ã Kafka ã«æ¸ãè¾¼ãã§ããããèªãã¨ãããã¨ã«ãªãã¾ããã§ãããæããã«ç¾å®ä¸çã§ã¯å¥ã®ã¢ããªã±ã¼ã·ã§ã³ã«ãªãã§ãããã
éå§æ¹æ³
ã§ã¯ãæåããå§ãã¦ã¿ã¾ããããhttps://code.quarkus.io ã«ã¢ã¯ã»ã¹ãã¦ããã¸ã§ã¯ããä½æãã以ä¸ã®æ¡å¼µæ©è½ãé¸æãã¾ãã
- RESTEasy JSON-B
- SmallRye Reactive Messaging - Kafka Connector
- Apache Avro
ããã¸ã§ã¯ãããã¦ã³ãã¼ããã¦ãã好ã㪠IDE ã§éãã¦ãã ããã
çæããã pom.xml
ã«ããã¤ãã®ã³ã³ãã³ãã追å ããå¿
è¦ãããã¾ãã pom.xml
ãã¡ã¤ã«ãéãã以ä¸ã®dependency
ã追å ãã¾ãã
<dependency> <groupId>io.apicurio</groupId> <artifactId>apicurio-registry-utils-serde</artifactId> <version>1.2.2.Final</version> <exclusions> <exclusion> <groupId>org.jboss.spec.javax.interceptor</groupId> <artifactId>jboss-interceptors-api_1.2_spec</artifactId> </exclusion> </exclusions> </dependency>
ãã®ä¾åé¢ä¿ã¯ãAvro ã·ãªã¢ã©ã¤ã¶ã¨ãã·ãªã¢ã©ã¤ã¶ãæä¾ãã¾ãããã®ã·ãªã¢ã©ã¤ã¶/ãã·ãªã¢ã©ã¤ã¶ã«ã¯è¤æ°ã®ãã¼ã¸ã§ã³ãããã¾ãããã®ããã°è¨äºã§ã¯ãApicurio ããæä¾ããããã®ã使ç¨ãã¦ãã¾ããConfluent ã®ãã®ã使ããã¨ãã§ãã¾ãï¼ã¢ã¼ãã£ãã¡ã¯ã㯠Maven Central ã«ã¯ãªãã®ã§ã追å ã®ãªãã¸ããªã追å ããå¿ è¦ãããã¾ãï¼ã
ã¾ããavro-maven-plugin
ã追å ããå¿
è¦ãããã¾ãã <build><plugins>
ã®ä¸ã«ã以ä¸ã追å ãã¾ãã
<plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.9.2</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>src/main/avro</sourceDirectory> <outputDirectory>${project.build.directory}/generated-sources</outputDirectory> <stringType>String</stringType> </configuration> </execution> </executions> </plugin>
ãã®ãã©ã°ã¤ã³ã¯ãsrc/main/avro
ãã£ã¬ã¯ããªã«ãã Avro ã¹ãã¼ã ãã¡ã¤ã«ããã³ã¼ããçæãã¾ãããã®è¨å®ã§ãããããã³ã¼ããæ¸ãå§ãããã¾ãã
Avro ã¹ãã¼ã
ã¾ããKafka ã§èªã¿æ¸ããããªãã¸ã§ã¯ãã表ãã¹ãã¼ããæ¸ãå¿
è¦ãããã¾ãã以ä¸ã®å
容㮠src/main/avro/movie.avsc
ãã¡ã¤ã«ãä½æãã¾ãã
{ "namespace": "me.escoffier.quarkus", "type": "record", "name": "Movie", "fields": [ { "name": "title", "type": "string" }, { "name": "year", "type": "int" } ] }
pom.xml
ãã¡ã¤ã«ã§è¨å®ããã avro-maven-plugin
ã¯ãtitle
㨠year
ã®å±æ§ãæ㤠me.escoffier.quarkus.Movie
ã¯ã©ã¹ãçæãã¾ããã¯ã©ã¹ãçæããã«ã¯ã以ä¸ãå®è¡ãã¾ãã
mvn generate-sources
Movie ãªã½ã¼ã¹
æåã«æ¸ãã¯ã©ã¹ã¯ HTTP ãªã¯ã¨ã¹ããåãåããï¼Movie ã®ï¼ãã¼ã¿ãKafkaã«æ¸ãè¾¼ã¿ã¾ãã src/main/java/me/escoffier/MovieResource.java
ã以ä¸ã®å
容ã§ä½æãã¾ãã
package me.escoffier; import me.escoffier.quarkus.Movie; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; import org.jboss.logging.Logger; import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @Path("/movies") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) public class MovieResource { private static final Logger LOGGER = Logger.getLogger("MovieResource"); @Inject @Channel("movies") Emitter<Movie> emitter; @POST public Response enqueueMovie(Movie movie) { LOGGER.infof("Sending movie %s to Kafka", movie.getTitle() ); emitter.send(movie); return Response.accepted().build(); } }
ãã® JAX-RS ãªã½ã¼ã¹ã¯ç°¡åã§ããããã¯åä¸ã®ã¨ã³ããã¤ã³ãã¡ã½ãããæã¡ã/movies
㧠JSON ãã¤ãã¼ããåä¿¡ãã¾ããRESTEasy 㯠JSON ããã¥ã¡ã³ããèªåçã« Movie
ãªãã¸ã§ã¯ãã«ãããã³ã°ãã¾ãã avsc
ãã¡ã¤ã«ã§èª¬æããã¦ããããã«ãæå¾
ããã JSON ã«ã¯ title
㨠year
ã® 2 ã¤ã®ãã£ã¼ã«ããå«ã¾ãã¦ãã¾ãã
ãªã¢ã¯ãã£ãã¡ãã»ã¼ã¸ã³ã°ã§ Quarkus ã使ç¨ããå ´åãKafka ã¨ç´æ¥ããã¨ããããã¨ã¯ããã¾ãããããªã㯠Emitter
ã注å
¥ãã¾ããããã¯ãªãã¸ã§ã¯ãï¼ç§ãã¡ã®movieï¼ããã£ãã«ã«éãã¾ããã¢ããªã±ã¼ã·ã§ã³ã®è¨å®ã§ã¯ããã®ãã£ãã«ã Kafka ãããã¯ã«ãããã³ã°ãã¾ãã
è¨å®ã¨ããã°ãsrc/main/resources/application.properties
ãéãã¦ã以ä¸ã追å ãã¾ãã
mp.messaging.connector.smallrye-kafka.apicurio.registry.url=http://localhost:8081/api mp.messaging.outgoing.movies.connector=smallrye-kafka mp.messaging.outgoing.movies.topic=movies mp.messaging.outgoing.movies.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer mp.messaging.outgoing.movies.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy mp.messaging.outgoing.movies.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy mp.messaging.outgoing.movies.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider
ãã®è¨å®ã¯å°ã説æãå¿
è¦ã§ããã¾ããmp.messaging.connector.smallrye-kafka.apicurio.registry.url
ã§ã¹ãã¼ãã¬ã¸ã¹ããªã® URL ãè¨å®ãã¾ããApicurio ã®ä»£ããã« Confluent ã®ã·ãªã¢ã©ã¤ã¶/ãã·ãªã¢ã©ã¤ã¶ã使ç¨ããå ´åãmp.messaging.connector.smallrye-kafka.schema.registry.url
ã¨ããååã®ããããã£ã§ãã
mp.messaging.outgoing.movies
㯠movies
ãã£ã³ãã«ãè¨å®ãã¾ãã connector
å±æ§ã¯ãSmallRye Kafka ã³ãã¯ã¿ããã£ãã«ã管çãã¦ãããã¨ã示ãã¾ãã topic
å±æ§ã¯ãããã¯ã®ååãæå®ãã¾ãï¼ããã¯ãã£ã³ãã«åã¨ä¸è´ããã®ã§ããã®å ´åã¯çç¥ã§ãã¾ãï¼ ãvalue.serializer
ã¯ä½¿ç¨ããã·ãªã¢ã©ã¤ã¶ãè¨å®ãã¾ããããã§ã¯ãApicurio ãæä¾ãã io.apicurio.registry.utils.serde.AvroKafkaSerializer
ã使ç¨ãã¾ãã registry.*
ããããã£ã¯ãã¬ã¸ã¹ããªãã¹ãã¼ããã©ã®ããã«æ±ãããè¨å®ãã¾ãã
Movie ã®ä½¿ç¨
å¾åã®ã¢ããªã¯ããã«ã·ã³ãã«ã«ãªã£ã¦ãã¾ããåä¿¡ããæ ç»ããã°ã«è¨é²ããã ãã§ãã
src/main/java/me/escoffier/MovieConsumer.java
ã以ä¸ã®å
容ã§ä½æãã¾ãã
package me.escoffier; import me.escoffier.quarkus.Movie; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.jboss.logging.Logger; import javax.enterprise.context.ApplicationScoped; @ApplicationScoped public class MovieConsumer { private static final Logger LOGGER = Logger.getLogger("MovieConsumer"); @Incoming("movies-from-kafka") public void receive(Movie movie) { LOGGER.infof("Received movie: %s (%d)", movie.getTitle(), movie.getYear()); } }
@Incoming
ã¢ããã¼ã·ã§ã³ã¯ãmovies-from-kafka
ãã£ãã«ä¸ãé·ç§»ããå Movie
ãªãã¸ã§ã¯ãã«å¯¾ãã¦ã¡ã½ãããå¼ã³åºããããã¨ã示ãã¦ãã¾ããä»åã¯ã±ã¼ã¹ã§ã¯ãåç´ã«ãã°ã¡ãã»ã¼ã¸ãæ¸ãã¾ãã
ããããããã¾ã§æ¥ã¦ãã¾ããKafka ããã®åä¿¡ãè¨å®ããå¿
è¦ãããã¾ãã application.properties
ãå度éãã以ä¸ã追å ãã¾ãã
mp.messaging.incoming.movies-from-kafka.connector=smallrye-kafka mp.messaging.incoming.movies-from-kafka.topic=movies mp.messaging.incoming.movies-from-kafka.value.deserializer=io.apicurio.registry.utils.serde.AvroKafkaDeserializer mp.messaging.incoming.movies-from-kafka.auto.offset.reset=earliest mp.messaging.incoming.movies-from-kafka.enable.auto.commit=false mp.messaging.incoming.movies-from-kafka.apicurio.registry.avro-datum-provider=io.apicurio.registry.utils.serde.avro.ReflectAvroDatumProvider
ãããã®ããããã£ã¯ movies-from-kafka
ã movies
Kafka ãããã¯ã«å¯¾å¿ä»ãã¾ããã¾ãããã·ãªã¢ã©ã¤ã¶ï¼io.apicurio.registry.utils.serde.AvroKafkaDeserializer
ï¼ã®è¨å®ãè¡ãã¾ããKafka ã®èªåã³ãããï¼enable.auto.commit=false
ï¼ãç¡å¹ã«ãã¦ãã¾ããããã¯ãªã¢ã¯ãã£ãã¡ãã»ã¼ã¸ã³ã°ããªãã»ããã³ããããå¦çãã¦ãããããã§ãã
éä¿¡è ã¨åä¿¡è ãåãã¢ããªã±ã¼ã·ã§ã³å ã«åå¨ãã¦ãã¾ãããã®ãããåããã£ãã«åã使ç¨ãããã¨ã¯ã§ãã¾ããã
ã¡ãã£ã¨ããã¤ã³ãã©
ã¢ããªã±ã¼ã·ã§ã³ãå®è¡ããåã«å¿ è¦ãªã®ã¯ä»¥ä¸ã§ãã
- Kafka ã®ããã¼ã«ã¼
- Apicurio ã®ã¹ãã¼ãã¬ã¸ã¹ããª
ããã¸ã§ã¯ãã®ã«ã¼ãã«ä»¥ä¸ã®å
容㮠docker-compose.yaml
ãã¡ã¤ã«ãä½æãã¾ãã
version: '2' services: zookeeper: image: strimzi/kafka:0.11.3-kafka-2.1.0 command: [ "sh", "-c", "bin/zookeeper-server-start.sh config/zookeeper.properties" ] ports: - "2181:2181" environment: LOG_DIR: /tmp/logs kafka: image: strimzi/kafka:0.11.3-kafka-2.1.0 command: [ "sh", "-c", "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" ] depends_on: - zookeeper ports: - "9092:9092" environment: LOG_DIR: "/tmp/logs" KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 schema-registry: image: apicurio/apicurio-registry-mem:1.2.2.Final ports: - 8081:8080 depends_on: - kafka environment: QUARKUS_PROFILE: prod KAFKA_BOOTSTRAP_SERVERS: localhost:9092 APPLICATION_ID: registry_id APPLICATION_SERVER: localhost:9000
ãã® docker-compose
ãã¡ã¤ã«ã¯å¿
è¦ãªãã®ããã¹ã¦èµ·åãã¾ããApicurio ã®ã¬ã¸ã¹ããªã®ããããã£ãæ°ã«ãªãæ¹ãããã£ãããããããã¾ãããå®ã¯ Apicurio ã®ã¬ã¸ã¹ããªã Quarkus ã®ã¢ããªã§ãã
å®è¡ããæé
ã§ã¯ãå§ãã¾ããããã¾ãã¯ãã¤ã³ãã©ã®æ´åãä¸ç·ã«å§ãã¾ãããã
docker-compose up -d
docker-compose down; docker-compose rm ã§ã¤ã³ãã©ãåæ¢ãã¾ãã ããã¦ãã¢ããªã±ã¼ã·ã§ã³ãèµ·åãã¾ãã
mvn compile quarkus:dev
ä¸åº¦èµ·åããããå¥ã®ç«¯æ«ãéãã¦æ ç»ãæ稿ãã¾ãã
curl --header "Content-Type: application/json" \ --request POST \ --data '{"title":"The Shawshank Redemption","year":1994}' \ http://localhost:8080/movies curl --header "Content-Type: application/json" \ --request POST \ --data '{"title":"The Godfather","year":1972}' \ http://localhost:8080/movies curl --header "Content-Type: application/json" \ --request POST \ --data '{"title":"The Dark Knight","year":2008}' \ http://localhost:8080/movies curl --header "Content-Type: application/json" \ --request POST \ --data '{"title":"12 Angry Men","year":1957}' \ http://localhost:8080/movies
ã¢ããªã±ã¼ã·ã§ã³ãã°ã§ã¯ã以ä¸ã®ããã«è¡¨ç¤ºããã¦ããã¯ãã§ãã
2020-09-11 16:42:22,597 INFO [MovieResource] (executor-thread-1) Sending movie The Shawshank Redemption to Kafka 2020-09-11 16:42:22,619 INFO [MovieResource] (executor-thread-1) Sending movie The Godfather to Kafka 2020-09-11 16:42:22,624 INFO [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Shawshank Redemption (1994) 2020-09-11 16:42:22,641 INFO [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Godfather (1972) 2020-09-11 16:42:22,644 INFO [MovieResource] (executor-thread-1) Sending movie The Dark Knight to Kafka 2020-09-11 16:42:22,663 INFO [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: The Dark Knight (2008) 2020-09-11 16:42:22,669 INFO [MovieResource] (executor-thread-1) Sending movie 12 Angry Men to Kafka 2020-09-11 16:42:22,688 INFO [MovieConsumer] (vert.x-eventloop-thread-0) Received movie: 12 Angry Men (1957)
çµè«
ããã¾ããï¼æ°è¡ã®ã³ã¼ãã¨å°ãã®è¨å®ã§ãKafkaãAvroãã¹ãã¼ãã¬ã¸ã¹ããªã Quarkus ã¢ããªã±ã¼ã·ã§ã³ã«çµ±åã§ãã¾ãããã®ãã¢ã®ã³ã¼ã㯠https://github.com/cescoffier/quarkus-kafka-and-avro ã«ããã¾ããreadme ãã¡ã¤ã«ã«ã¯ããããå®è¡ããããã®æé ãè¨è¼ããã¦ãã¾ãã
Quarkus 1.9 ã§ã¯ãã¡ãã»ã¼ã¸ã³ã°ããªã¢ã¯ãã£ãå ¨è¬ã«ã¤ãã¦ã®æ°ããæ©è½ããããã追å ããã¾ãããæå¾ ãã ããï¼