Skip to content

DBZ-4967 If sink is Kafka and offset storage is Kafka topic, then reuse connection properties from sink to offset storage also.#5782

Closed
PlugaruT wants to merge 1 commit intodebezium:mainfrom
PlugaruT:DBZ-4967
Closed

DBZ-4967 If sink is Kafka and offset storage is Kafka topic, then reuse connection properties from sink to offset storage also.#5782
PlugaruT wants to merge 1 commit intodebezium:mainfrom
PlugaruT:DBZ-4967

Conversation

@PlugaruT
Copy link
Contributor

@PlugaruT PlugaruT commented Aug 21, 2024

https://issues.redhat.com/browse/DBZ-4967

Example:
offset.storage.kafka.producer.bootstrap.servers value will also be available under bootstrap.servers. This is because KafkaOffsetBackingStore is expecting these properties to be available at the top level.

…se connection properties from sink to offset storage also.

Example:
`offset.storage.kafka.producer.bootstrap.servers` value will also be available under `bootstrap.servers`. This is because `KafkaOffsetBackingStore` is expecting these properties to be available at the top level.
@PlugaruT
Copy link
Contributor Author

@jpechane This is a naive implementation. The idea is that the KafkaOffsetBackingStore expects all properties to be present at the top level, see commit message. And this applied also to all other properties related to connection, like security.protocol, sasl.jaas.config, sasl.mechanism. Again, I think this only makes sense if both the sink and the offset storage is Kafka, hence the if check there. Let me know your thoughts.

@jpechane
Copy link
Contributor

Hi, this seems reasonable. Still I'm a bit reluctnat to base the if statement on the name in the config. I believe we need somethign safer probably reflected in method signature.

@PlugaruT
Copy link
Contributor Author

Hi, this seems reasonable. Still I'm a bit reluctnat to base the if statement on the name in the config. I believe we need somethign safer probably reflected in method signature.

The other way I can see it is to do something like

this.consumer.class.getName.equals(KafkaChangeConsumer.class.getName()) // but it's not possible due to the consumer being in `debezium-server` package which is not a dependency I think

Or, we can drop the if and have only this

String PREFIX = "offset.storage.kafka.producer.";
            adminProps.forEach((configName, value) -> {
                if (configName.startsWith(PREFIX)) {
                    adminProps.put(configName.substring(PREFIX.length()), value);
                }
            });

the only downside is that it will pollute the config object which get's send to the offset store and if I think about it, maybe it's ok?

I don't see how else to do this check when both sink and offset storage is the same, apart from implementing inside Debezium the Kafka offset storage, just like it is with the Redis one.

@PlugaruT
Copy link
Contributor Author

@jpechane can you please provide some feedback? I am ready to proceed with the implementation. Thanks

@jpechane
Copy link
Contributor

jpechane commented Oct 1, 2024

@PlugaruT Hi, I am sorry. I am somehow overlook this one. So there is really no good solution.
IMHO the best approach is to inttroduce "kafka" string as a constant probably somewhere in the core module. The if statement and KafkaChangeConsumer would share that constant to create a logical link. A comprehensive comment will be added to the if statement explaining why it is there. WFYT?

@PlugaruT
Copy link
Contributor Author

Closing and I'll work on it a bit later.

@PlugaruT PlugaruT closed this Oct 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants