Skip to content

enable handling tombstones#5997

Open
msillence wants to merge 1 commit intodebezium:mainfrom
jhc-systems:tombstone
Open

enable handling tombstones#5997
msillence wants to merge 1 commit intodebezium:mainfrom
jhc-systems:tombstone

Conversation

@msillence
Copy link
Contributor

No description provided.

@github-actions
Copy link

Hi @msillence, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

@jpechane
Copy link
Contributor

@vjuranek @msillence Please take a look. Feel free to approve/merge if/when you are happy with it.

@jpechane
Copy link
Contributor

@msillence Could you please rebase on the latest main?


public Struct getAfterStruct() {
if (isTombstone()) {
return ((Struct) record.key());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this here could lead to potentially issues in the non-tombstone cases, don't you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see your concern this is generic code that is used in other places, though if this is a non-tombstone event this if condition will be skipped and the original path will be executed. If this is a tombstone then the there is no value struct as that's part of the isTombstone test so we would without this code return null.
I also checked and there's no after structure when we are dealing with a key. I think this is safe from my testing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is no longer required


public boolean processDelete(JdbcSinkRecord record, final CollectionId collectionId,
final Map<CollectionId, Buffer> upsertBufferByTable, final Map<CollectionId, Buffer> deleteBufferByTable) {
if (!config.isDeleteEnabled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put this out avoiding the return value since, as the method name suggests, it will process the delete.

@msillence
Copy link
Contributor Author

msillence commented Nov 21, 2024

this might not be ready now I've moved this to main - having a little difficulty with the snapshot dependencies so this is what I think it should look like. Tracing through the code I believe that delete now just uses the key so the complication of getting the struct has gone.

wont compile with formatter 2.20.0
@msillence
Copy link
Contributor Author

OK it turns out the maven formatter plugin was preventing me from building mvn 3.9.9 java 21
now I can build it I can confirm it all works and I've fixed the formatting.
The fix is much cleaner in trunk thanks to the changes to the sql delete path using the key fields already.

LOGGER.debug("Skipping tombstone record {}", record);
LOGGER.debug("processing tombstone record {}", record);

if (!config.isDeleteEnabled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpechane @Naros I am currently hitting the question why deletes are disabled by default?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rk3rn3r, I aligned this with Confluent's implementation to minimize the configuration changes for those who move from one to the other.

To me this also applies to other configurations like schema.evolution being disabled by default and the insert.mode defaulting to insert rather than upsert. Again, mirroring the same config defaults from Confluent.


final CollectionId collectionId = optionalCollectionId.get();

if (record.isTombstone()) {
Copy link
Member

@rk3rn3r rk3rn3r Nov 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@msillence Should we maybe move the below code for if (record.isDelete()) { ... } up here? record.isDelete() covers the case of tombstones and Debezium delete change event records. The code below can be deleted.

Also @jpechane @mfvitale @Naros Please mind that this new behavior will cause 2 deletes executed because tombstones.on.delete in source connectors and even with the deprecated delete.handling​.mode option in ExtractNewRecordState SMT COULD cause 2 messages for a single delete event happening in the source datastore (one Debezium DELETE change event and one tombstone), depending on those configs. The implementation I am working on will de-duplicate those events so this is only temporary an issue. The reason for the previous code was that we avoided duplicate delete events for the same row/source event. But it excludes deletes for source connectors using ExtractNewRecordState SMT that drops the Debezium DELETE events and keeps tombstones (which might be semantically the best way to represent deletes in Kafka imho).

@jpechane As this code is moved to the AbstractChangeEventSink eventually with my current work, we should maybe evaluate if we close this PR as those lines will be dropped soon?
Also we should have tests then for those scenarios with this PR (single tombstone, single Debezium DELETE change event, and DELETE event followed by tombstone). FYI @msillence

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@msillence After writing the above, maybe we can just drop those lines here for if (record.isTombstone()) {...} entirely and keep the if (record.isDelete()) { ... } below as it handles tombstones and Debezium DELETE change event messages, yielding the previously mentioned duplicate deletes issued to the destination datastore. The idea was to skip those duplicates delete events in the default scenarion but excludes for certain ExtractNewRecordState SMT configs (which imho are even syntactically and semantically good and correct)!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpechane I am sorry for the late coming to the party.
@rk3rn3r I'd like to get this into 3.0 as it is the next downstream version. Looking a the calendar I'd suppose your changes will go into 3.1.

Generally the new solution must

  • Exactly one DELETE statement should be issued if the SMT is not in place
  • Exactly one DELETE statement should be issued if SMT is in place and delete.tombstone.handling.mode = drop
  • Exactly one DELETE statement should be issued if SMT is in place and delete.tombstone.handling.mode = rewrite-with-tombstone

All three cases should be tested. If this would require a change at either core or SMT then it should be done. The legacy config options could be ignored and thei behaviour in this case exteced as undefined.

@mfvitale
Copy link
Member

Hi @msillence It is not clear to me how this is going to work.

I have the feeling that with the primary.key.mode set to record_value a tombstone event from Debezium - both value and valueSchema to null - will lead to an error.

Can you please add some tests (in AbstractJdbcSinkDeleteEnabledTest.java) to cover the following cases?

  1. primary.key.mode=record_key and a Debezium tombstone
  2. primary.key.mode=record_key and a ExtractNewRecordState tombstone - only value null
  3. primary.key.mode=record_value and a Debezium tombstone
  4. primary.key.mode=record_value and a ExtractNewRecordState tombstone - only value null

@Naros
Copy link
Member

Naros commented Jan 6, 2026

❌ Developer Certificate of Origin (DCO) check failed.

Hi @msillence, please sign off all commits with:

git commit -s

If pull request commits are not signed off, the pull request cannot be merged. For more information about why this is required, please see our blog about contribution requirement changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants