Conversation
|
Hi @msillence, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key. |
debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java
Show resolved
Hide resolved
|
@vjuranek @msillence Please take a look. Feel free to approve/merge if/when you are happy with it. |
|
@msillence Could you please rebase on the latest main? |
|
|
||
| public Struct getAfterStruct() { | ||
| if (isTombstone()) { | ||
| return ((Struct) record.key()); |
There was a problem hiding this comment.
Doing this here could lead to potentially issues in the non-tombstone cases, don't you think?
There was a problem hiding this comment.
I can see your concern this is generic code that is used in other places, though if this is a non-tombstone event this if condition will be skipped and the original path will be executed. If this is a tombstone then the there is no value struct as that's part of the isTombstone test so we would without this code return null.
I also checked and there's no after structure when we are dealing with a key. I think this is safe from my testing
There was a problem hiding this comment.
this is no longer required
|
|
||
| public boolean processDelete(JdbcSinkRecord record, final CollectionId collectionId, | ||
| final Map<CollectionId, Buffer> upsertBufferByTable, final Map<CollectionId, Buffer> deleteBufferByTable) { | ||
| if (!config.isDeleteEnabled()) { |
There was a problem hiding this comment.
I would put this out avoiding the return value since, as the method name suggests, it will process the delete.
|
this might not be ready now I've moved this to main - having a little difficulty with the snapshot dependencies so this is what I think it should look like. Tracing through the code I believe that delete now just uses the key so the complication of getting the struct has gone. |
wont compile with formatter 2.20.0
|
OK it turns out the maven formatter plugin was preventing me from building mvn 3.9.9 java 21 |
| LOGGER.debug("Skipping tombstone record {}", record); | ||
| LOGGER.debug("processing tombstone record {}", record); | ||
|
|
||
| if (!config.isDeleteEnabled()) { |
There was a problem hiding this comment.
@rk3rn3r, I aligned this with Confluent's implementation to minimize the configuration changes for those who move from one to the other.
To me this also applies to other configurations like schema.evolution being disabled by default and the insert.mode defaulting to insert rather than upsert. Again, mirroring the same config defaults from Confluent.
|
|
||
| final CollectionId collectionId = optionalCollectionId.get(); | ||
|
|
||
| if (record.isTombstone()) { |
There was a problem hiding this comment.
@msillence Should we maybe move the below code for if (record.isDelete()) { ... } up here? record.isDelete() covers the case of tombstones and Debezium delete change event records. The code below can be deleted.
Also @jpechane @mfvitale @Naros Please mind that this new behavior will cause 2 deletes executed because tombstones.on.delete in source connectors and even with the deprecated delete.handling.mode option in ExtractNewRecordState SMT COULD cause 2 messages for a single delete event happening in the source datastore (one Debezium DELETE change event and one tombstone), depending on those configs. The implementation I am working on will de-duplicate those events so this is only temporary an issue. The reason for the previous code was that we avoided duplicate delete events for the same row/source event. But it excludes deletes for source connectors using ExtractNewRecordState SMT that drops the Debezium DELETE events and keeps tombstones (which might be semantically the best way to represent deletes in Kafka imho).
@jpechane As this code is moved to the AbstractChangeEventSink eventually with my current work, we should maybe evaluate if we close this PR as those lines will be dropped soon?
Also we should have tests then for those scenarios with this PR (single tombstone, single Debezium DELETE change event, and DELETE event followed by tombstone). FYI @msillence
There was a problem hiding this comment.
@msillence After writing the above, maybe we can just drop those lines here for if (record.isTombstone()) {...} entirely and keep the if (record.isDelete()) { ... } below as it handles tombstones and Debezium DELETE change event messages, yielding the previously mentioned duplicate deletes issued to the destination datastore. The idea was to skip those duplicates delete events in the default scenarion but excludes for certain ExtractNewRecordState SMT configs (which imho are even syntactically and semantically good and correct)!
There was a problem hiding this comment.
@jpechane I am sorry for the late coming to the party.
@rk3rn3r I'd like to get this into 3.0 as it is the next downstream version. Looking a the calendar I'd suppose your changes will go into 3.1.
Generally the new solution must
- Exactly one
DELETEstatement should be issued if the SMT is not in place - Exactly one
DELETEstatement should be issued if SMT is in place anddelete.tombstone.handling.mode = drop - Exactly one
DELETEstatement should be issued if SMT is in place anddelete.tombstone.handling.mode = rewrite-with-tombstone
All three cases should be tested. If this would require a change at either core or SMT then it should be done. The legacy config options could be ignored and thei behaviour in this case exteced as undefined.
|
Hi @msillence It is not clear to me how this is going to work. I have the feeling that with the Can you please add some tests (in
|
|
❌ Developer Certificate of Origin (DCO) check failed. Hi @msillence, please sign off all commits with: If pull request commits are not signed off, the pull request cannot be merged. For more information about why this is required, please see our blog about contribution requirement changes. |
No description provided.