Spring Boot Starter Apache Kafka Wrapper для обработки сообщений в архитектуре MVC.
- Автоматически настроенные компоненты для удобной отправки и приема сообщений через Kafka.
- Абстракция
kafka endpoints
поверх kafka топиков для взаимодействия как в стандартном rest http подходе. - Автоматически настроенные
request
иreply
топики для каждого клиента и поддержка семантики синхронных request-reply - TraceId логи сообщений из коробки
-
Сериализация и десериализация:
- Интерфейсы и реализации для сериализации и десериализации сообщений Kafka, такие как
KafkaRequestDeserializer
иKafkaResponseDeserializer
(например,KafkaRequestDeserializerImpl
иKafkaResponseDeserializerImpl
). - Эти классы обрабатывают преобразование сообщений в объекты Java и обратно.
- Интерфейсы и реализации для сериализации и десериализации сообщений Kafka, такие как
-
Аннотации:
- Аннотации для маркировки методов и классов, например,
@KafkaMvcController
,@KafkaMvcMapping
,@ExceptionHandler
.
- Аннотации для маркировки методов и классов, например,
-
Конфигурация:
- Классы, такие как
KafkaMvcConsumerAutoconfiguration
иKafkaMvcProducerAutoconfiguration
, отвечают за настройку потребителей и производителей Kafka. - Эти классы создают фабрики потребителей и производителей, а также шаблоны Kafka для отправки и получения сообщений.
- Классы, такие как
-
Обработка сообщений:
KafkaMvcConsumer
иKafkaMvcProducer
обрабатывают входящие и исходящие сообщения.KafkaMvcConsumer
используетRequestGateway
для управления потоками и обработки задач.
-
Исключения и обработка ошибок:
KafkaMvcExceptionHandlerBean
и связанные с ним аннотации обрабатывают исключения, возникающие при обработке сообщений.KafkaSerializationException
используется для обработки ошибок сериализации.
-
Утилиты и вспомогательные классы:
KafkaMvcRequestCreator
иKafkaMvcRequestBuilder
помогают в создании и отправке запросов.KafkaAdminProvider
управляет настройками Kafka Admin.
-
Добавление зависимости:
- Установка локально
git clone https://github.com/owpk/springboot-kafka-mvc-starter cd springboot-kafka-mvc-starter ./mvnw clean install
- maven
<dependency> <groupId>ru.owpk.kafkamvc</groupId> <artifactId>springboot-kafka-mvc-starter</artifactId> <version>1.8.0-17</version> </dependency>
- gradle
repositories { mavenLocal() } dependencies { implementation 'ru.owpk.kafkamvc:springboot-kafka-mvc-starter:1.8.0-17' }
-
Конфигурация:
- В
application.properties
илиapplication.yml
укажите настройки для Kafka-mvc, такие какkafka-mvc.bootstrap-servers
,kafka-mvc.consumer.name
,kafka-mvc.producer.replyTopic
- В
-
Аннотации:
- Используйте аннотацию
@EnableKafkaMvcConsumer
для включения функциональности потребителя Kafka. - Используйте аннотацию
@EnableKafkaMvcProducer
для включения функциональности производителя Kafka.
- Используйте аннотацию
-
Создание контроллеров:
- Создайте классы, аннотированные
@KafkaMvcController
, чтобы определить обработчики сообщений Kafka. Укажитеtopic
иidleInterval
в аннотации. - Внутри контроллера используйте аннотацию
@KafkaMvcMapping
для методов, которые будут обрабатывать определенные действия.
- Создайте классы, аннотированные
-
Обработка исключений:
- Создайте классы, аннотированные
@KafkaMvcExceptionHandler
, для обработки исключений, возникающих при обработке сообщений.
- Создайте классы, аннотированные
-
Отправка сообщений:
-
Используйте
KafkaMvcRequestCreator
для отправки сообщений. Вы можете отправлять сообщения синхронно или асинхронно, используя методыsend
иsendAsync
. -
По умолчанию bean
KafkaMvcRequestCreator
не создается автоконфигурацией, для этого вы должны создать его вручную:@Configuration public class BeanConfig { @Bean KafkaMvcRequestCreator requestCreator(KafkaMvcProducer producer) { return new KafkaMvcRequestCreator(producer); } }
-
-
Пример использования:
-
application.yml
kafka-mvc: bootstrap-servers: localhost:9092 consumer: name: "service-a-consumer" threads: max: 50 start: 10 producer: replyTopic: "service.a.response" timeout: 10000
-
Пример конфигурации:
import ru.owpk.kafkamvc.annotation.EnableKafkaMvcConsumer; import ru.owpk.kafkamvc.annotation.EnableKafkaMvcProducer; import ru.owpk.kafkamvc.producer.KafkaMvcProducer; import ru.owpk.kafkamvc.utils.KafkaMvcRequestCreator; @Configuration @EnableKafkaMvcProducer @EnableKafkaMvcConsumer public class BeanConfig { @Bean public KafkaMvcRequestCreator requestCreator(KafkaMvcProducer kafkaSparuralProducer) { return new KafkaMvcRequestCreator(kafkaSparuralProducer); } }
-
Пример продюссера:
@Service public class ExampleService { @Autowired private KafkaMvcRequestCreator requestCreator; // Async example public String fooAsync(MyRequestDtoObj req) { var asyncResponse = requestCreator.createRequestBuilder() .withAction("/exampleEndpoint") .withTopicName("service-b") .withRequestBody(req) .sendAsync(); System.out.println(asyncResponse); } // Sync example public void fooSync(MyRequestDtoObj req) { OtherResponseDtoObj syncResponse = requestCreator.createRequestBuilder() .withAction("/exampleEndpoint") .withTopicName("service-b") .withRequestBody(req) .sendForEntity(OtherResponseDtoObj.class); System.out.println(syncResponse); } }
-
Пример контроллера:
import ru.owpk.kafkamvc.annotation.Payload; import ru.owpk.kafkamvc.annotation.RequestParam; @KafkaMvcController(topic = "example-topic") public class ExampleController { @KafkaMvcMapping("/exampleEndpoint") public OtherResponseDtoObj bar(@Payload MyRequestDtoObj request) { // Логика обработки } // @RequestParam - "?key=value" analog @KafkaMvcMapping("/exampleEndpointWithRequestVariables") public OtherResponseDtoObj bar(@Payload MyRequestDtoObj request, @RequestParam Integer count) { // Логика обработки } }
-
Полный пример можно посмотреть тут: https://github.com/owpk/ocrv-kafka-demo