Skip to content

DRAFT! WIP! DBZ-7996 Add AbstractChangeEventSink with shared logic for JDBC sink and MongoDB sink connectors#5918

Open
rk3rn3r wants to merge 2 commits intodebezium:mainfrom
rk3rn3r:DBZ-7996-unify-jdbc-and-mongodb-sinks
Open

DRAFT! WIP! DBZ-7996 Add AbstractChangeEventSink with shared logic for JDBC sink and MongoDB sink connectors#5918
rk3rn3r wants to merge 2 commits intodebezium:mainfrom
rk3rn3r:DBZ-7996-unify-jdbc-and-mongodb-sinks

Conversation

@rk3rn3r
Copy link
Member

@rk3rn3r rk3rn3r commented Oct 4, 2024

@rk3rn3r
Copy link
Member Author

rk3rn3r commented Oct 4, 2024

@jpechane A bit late, but for a first look here's my PR including all changes so far.

JDBC sink connector is using the old logic (can be switched to new one using internal.enable.sces option later).

It is missing the final Buffer and Batch logic (not yet finished).
MongoDB sink is also not using the new code yet.

@rk3rn3r rk3rn3r force-pushed the DBZ-7996-unify-jdbc-and-mongodb-sinks branch 4 times, most recently from 27b5d89 to f0d3474 Compare October 4, 2024 16:40
@rk3rn3r rk3rn3r force-pushed the DBZ-7996-unify-jdbc-and-mongodb-sinks branch 3 times, most recently from cde20c7 to 2d5c240 Compare November 6, 2024 08:56
@rk3rn3r rk3rn3r force-pushed the DBZ-7996-unify-jdbc-and-mongodb-sinks branch from 2d5c240 to 49cfed9 Compare November 22, 2024 09:55
import java.util.List;
import java.util.Map;

public class LinkedHashMapExtractor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a new class? I think it can go into io.debezium.util.Collect.


public Optional<CollectionId> getCollectionId(String collectionName) {
return Optional.of(new CollectionId(collectionName));
public CollectionId getCollectionId(String collectionName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change from Optional? I amnot against it I' like to know the motivation.


protected final SinkRecord originalKafkaRecord;

private final Object key;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this attrubute used anywhere?

public abstract class AbstractBuffer implements Buffer {

protected final Map<CollectionId, LinkedHashMap<Object, DebeziumSinkRecord>> records = new LinkedHashMap<>();
protected final LinkedHashMap<Object, DebeziumSinkRecord> records = new LinkedHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is possible to use java.util.SequencedMap in the decalration here and in other parts of the code?

@Naros
Copy link
Member

Naros commented Jan 6, 2026

❌ Developer Certificate of Origin (DCO) check failed.

Hi @rk3rn3r, please sign off all commits with:

git commit -s

If pull request commits are not signed off, the pull request cannot be merged. For more information about why this is required, please see our blog about contribution requirement changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants