Skip to content

Commit

Permalink
refactor(kafka): reconfigure consumers to allow different config (#11869
Browse files Browse the repository at this point in the history
)
  • Loading branch information
david-leifker authored Nov 19, 2024
1 parent 4601aac commit a1c783b
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 15 deletions.
1 change: 0 additions & 1 deletion docker/profiles/docker-compose.gms.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ x-kafka-env: &kafka-env
# KAFKA_SCHEMAREGISTRY_URL=http://schema-registry:8081
SCHEMA_REGISTRY_TYPE: INTERNAL
KAFKA_SCHEMAREGISTRY_URL: http://datahub-gms:8080/schema-registry/api/
SPRING_KAFKA_CONSUMER_AUTO_OFFSET_RESET: ${SPRING_KAFKA_CONSUMER_AUTO_OFFSET_RESET:-earliest}

x-datahub-quickstart-telemetry-env: &datahub-quickstart-telemetry-env
DATAHUB_SERVER_TYPE: ${DATAHUB_SERVER_TYPE:-quickstart}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.metadata.kafka;

import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCL_EVENT_CONSUMER_NAME;

import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition;
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
import com.linkedin.mxe.Topics;
Expand Down Expand Up @@ -39,7 +41,7 @@ public class MCLKafkaListenerRegistrar implements InitializingBean {
@Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Autowired
@Qualifier("kafkaEventConsumer")
@Qualifier(MCL_EVENT_CONSUMER_NAME)
private KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;

@Value("${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.metadata.kafka;

import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
Expand Down Expand Up @@ -60,7 +62,7 @@ public class MetadataChangeEventsProcessor {
"${METADATA_CHANGE_EVENT_NAME:${KAFKA_MCE_TOPIC_NAME:"
+ Topics.METADATA_CHANGE_EVENT
+ "}}",
containerFactory = "kafkaEventConsumer")
containerFactory = DEFAULT_EVENT_CONSUMER_NAME)
@Deprecated
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.linkedin.metadata.Constants.MDC_CHANGE_TYPE;
import static com.linkedin.metadata.Constants.MDC_ENTITY_TYPE;
import static com.linkedin.metadata.Constants.MDC_ENTITY_URN;
import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCP_EVENT_CONSUMER_NAME;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void registerConsumerThrottle() {
@KafkaListener(
id = CONSUMER_GROUP_ID_VALUE,
topics = "${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL + "}",
containerFactory = "kafkaEventConsumer")
containerFactory = MCP_EVENT_CONSUMER_NAME)
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "consume").time()) {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.datahub.event;

import static com.linkedin.metadata.config.kafka.KafkaConfiguration.PE_EVENT_CONSUMER_NAME;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
Expand Down Expand Up @@ -56,7 +58,7 @@ public PlatformEventProcessor(
@KafkaListener(
id = "${PLATFORM_EVENT_KAFKA_CONSUMER_GROUP_ID:generic-platform-event-job-client}",
topics = {"${PLATFORM_EVENT_TOPIC_NAME:" + Topics.PLATFORM_EVENT + "}"},
containerFactory = "kafkaEventConsumer")
containerFactory = PE_EVENT_CONSUMER_NAME)
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
try (Timer.Context i = MetricUtils.timer(this.getClass(), "consume").time()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,13 @@ public class ConsumerConfiguration {
private int maxPartitionFetchBytes;
private boolean stopOnDeserializationError;
private boolean healthCheckEnabled;

private ConsumerOptions mcp;
private ConsumerOptions mcl;
private ConsumerOptions pe;

@Data
public static class ConsumerOptions {
private String autoOffsetReset;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public class KafkaConfiguration {
"spring.deserializer.key.delegate.class";
public static final String VALUE_DESERIALIZER_DELEGATE_CLASS =
"spring.deserializer.value.delegate.class";
public static final String MCP_EVENT_CONSUMER_NAME = "mcpEventConsumer";
public static final String MCL_EVENT_CONSUMER_NAME = "mclEventConsumer";
public static final String PE_EVENT_CONSUMER_NAME = "platformEventConsumer";
public static final String DEFAULT_EVENT_CONSUMER_NAME = "kafkaEventConsumer";

private String bootstrapServers;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@ kafka:
maxPartitionFetchBytes: ${KAFKA_CONSUMER_MAX_PARTITION_FETCH_BYTES:5242880} # the max bytes consumed per partition
stopOnDeserializationError: ${KAFKA_CONSUMER_STOP_ON_DESERIALIZATION_ERROR:true} # Stops kafka listener container on deserialization error, allows user to fix problems before moving past problematic offset. If false will log and move forward past the offset
healthCheckEnabled: ${KAFKA_CONSUMER_HEALTH_CHECK_ENABLED:true} # Sets the health indicator to down when a message listener container has stopped due to a deserialization failure, will force consumer apps to restart through k8s and docker-compose health mechanisms
mcp:
autoOffsetReset: ${KAFKA_CONSUMER_MCP_AUTO_OFFSET_RESET:earliest}
mcl:
autoOffsetReset: ${KAFKA_CONSUMER_MCL_AUTO_OFFSET_RESET:earliest}
pe:
autoOffsetReset: ${KAFKA_CONSUMER_PE_AUTO_OFFSET_RESET:latest}

schemaRegistry:
type: ${SCHEMA_REGISTRY_TYPE:KAFKA} # INTERNAL or KAFKA or AWS_GLUE
url: ${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package com.linkedin.gms.factory.kafka;

import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME;
import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCL_EVENT_CONSUMER_NAME;
import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCP_EVENT_CONSUMER_NAME;
import static com.linkedin.metadata.config.kafka.KafkaConfiguration.PE_EVENT_CONSUMER_NAME;

import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.config.kafka.ConsumerConfiguration;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -23,7 +31,6 @@
@Slf4j
@Configuration
public class KafkaEventConsumerFactory {

private int kafkaEventConsumerConcurrency;

@Bean(name = "kafkaConsumerFactory")
Expand Down Expand Up @@ -87,33 +94,100 @@ private static Map<String, Object> buildCustomizedProperties(
return customizedProperties;
}

@Bean(name = "kafkaEventConsumer")
@Bean(name = PE_EVENT_CONSUMER_NAME)
protected KafkaListenerContainerFactory<?> platformEventConsumer(
@Qualifier("kafkaConsumerFactory")
DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory,
@Qualifier("configurationProvider") ConfigurationProvider configurationProvider) {

return buildDefaultKafkaListenerContainerFactory(
PE_EVENT_CONSUMER_NAME,
kafkaConsumerFactory,
configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(),
configurationProvider.getKafka().getConsumer().getPe());
}

@Bean(name = MCP_EVENT_CONSUMER_NAME)
protected KafkaListenerContainerFactory<?> mcpEventConsumer(
@Qualifier("kafkaConsumerFactory")
DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory,
@Qualifier("configurationProvider") ConfigurationProvider configurationProvider) {

return buildDefaultKafkaListenerContainerFactory(
MCP_EVENT_CONSUMER_NAME,
kafkaConsumerFactory,
configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(),
configurationProvider.getKafka().getConsumer().getMcp());
}

@Bean(name = MCL_EVENT_CONSUMER_NAME)
protected KafkaListenerContainerFactory<?> mclEventConsumer(
@Qualifier("kafkaConsumerFactory")
DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory,
@Qualifier("configurationProvider") ConfigurationProvider configurationProvider) {

return buildDefaultKafkaListenerContainerFactory(
MCL_EVENT_CONSUMER_NAME,
kafkaConsumerFactory,
configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(),
configurationProvider.getKafka().getConsumer().getMcl());
}

@Bean(name = DEFAULT_EVENT_CONSUMER_NAME)
protected KafkaListenerContainerFactory<?> kafkaEventConsumer(
@Qualifier("kafkaConsumerFactory")
DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory,
@Qualifier("configurationProvider") ConfigurationProvider configurationProvider) {

return buildDefaultKafkaListenerContainerFactory(
DEFAULT_EVENT_CONSUMER_NAME,
kafkaConsumerFactory,
configurationProvider.getKafka().getConsumer().isStopOnDeserializationError(),
null);
}

private KafkaListenerContainerFactory<?> buildDefaultKafkaListenerContainerFactory(
String consumerFactoryName,
DefaultKafkaConsumerFactory<String, GenericRecord> kafkaConsumerFactory,
boolean isStopOnDeserializationError,
@Nullable ConsumerConfiguration.ConsumerOptions consumerOptions) {

final DefaultKafkaConsumerFactory<String, GenericRecord> factoryWithOverrides;
if (consumerOptions != null) {
// Copy the base config
Map<String, Object> props = new HashMap<>(kafkaConsumerFactory.getConfigurationProperties());
// Override just the auto.offset.reset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerOptions.getAutoOffsetReset());
factoryWithOverrides =
new DefaultKafkaConsumerFactory<>(
props,
kafkaConsumerFactory.getKeyDeserializer(),
kafkaConsumerFactory.getValueDeserializer());
} else {
factoryWithOverrides = kafkaConsumerFactory;
}

ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory);
factory.setConsumerFactory(factoryWithOverrides);
factory.setContainerCustomizer(new ThreadPoolContainerCustomizer());
factory.setConcurrency(kafkaEventConsumerConcurrency);

/* Sets up a delegating error handler for Deserialization errors, if disabled will
use DefaultErrorHandler (does back-off retry and then logs) rather than stopping the container. Stopping the container
prevents lost messages until the error can be examined, disabling this will allow progress, but may lose data
*/
if (configurationProvider.getKafka().getConsumer().isStopOnDeserializationError()) {
if (isStopOnDeserializationError) {
CommonDelegatingErrorHandler delegatingErrorHandler =
new CommonDelegatingErrorHandler(new DefaultErrorHandler());
delegatingErrorHandler.addDelegate(
DeserializationException.class, new CommonContainerStoppingErrorHandler());
factory.setCommonErrorHandler(delegatingErrorHandler);
}
log.info(
String.format(
"Event-based KafkaListenerContainerFactory built successfully. Consumer concurrency = %s",
kafkaEventConsumerConcurrency));
"Event-based {} KafkaListenerContainerFactory built successfully. Consumer concurrency = {}",
consumerFactoryName,
kafkaEventConsumerConcurrency);

return factory;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.datahubproject.openapi.test;

import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.config.kafka.KafkaConfiguration.DEFAULT_EVENT_CONSUMER_NAME;
import static org.testng.Assert.*;

import com.linkedin.common.urn.Urn;
Expand Down Expand Up @@ -199,7 +200,7 @@ public void testPEConsumption()
@KafkaListener(
id = "test-mcp-consumer",
topics = Topics.METADATA_CHANGE_PROPOSAL,
containerFactory = "kafkaEventConsumer",
containerFactory = DEFAULT_EVENT_CONSUMER_NAME,
properties = {"auto.offset.reset:earliest"})
public void receiveMCP(ConsumerRecord<String, GenericRecord> consumerRecord) {

Expand All @@ -216,7 +217,7 @@ public void receiveMCP(ConsumerRecord<String, GenericRecord> consumerRecord) {
@KafkaListener(
id = "test-mcl-consumer",
topics = Topics.METADATA_CHANGE_LOG_VERSIONED,
containerFactory = "kafkaEventConsumer",
containerFactory = DEFAULT_EVENT_CONSUMER_NAME,
properties = {"auto.offset.reset:earliest"})
public void receiveMCL(ConsumerRecord<String, GenericRecord> consumerRecord) {

Expand All @@ -232,7 +233,7 @@ public void receiveMCL(ConsumerRecord<String, GenericRecord> consumerRecord) {
@KafkaListener(
id = "test-pe-consumer",
topics = Topics.PLATFORM_EVENT,
containerFactory = "kafkaEventConsumer",
containerFactory = DEFAULT_EVENT_CONSUMER_NAME,
properties = {"auto.offset.reset:earliest"})
public void receivePE(ConsumerRecord<String, GenericRecord> consumerRecord) {

Expand Down

0 comments on commit a1c783b

Please sign in to comment.