Skip to content

DBZ-8430 Use list of columns in incremental snapshot select stmt#6000

Closed
wukachn wants to merge 2 commits intodebezium:mainfrom
wukachn:DBZ-8430
Closed

DBZ-8430 Use list of columns in incremental snapshot select stmt#6000
wukachn wants to merge 2 commits intodebezium:mainfrom
wukachn:DBZ-8430

Conversation

@Naros
Copy link
Member

Naros commented Nov 19, 2024

Hi @wukachn, it seems the test failures are related. Could you take a look?

@wukachn
Copy link
Contributor Author

wukachn commented Nov 20, 2024

Ill take a look at the ITs at some point today

@wukachn
Copy link
Contributor Author

wukachn commented Nov 20, 2024

Looks like the initial change has made MySQL schema changes not supported at all during an incremental snapshot. If thats true then schema changes must have never been supported during a MySQL incremental snapshot if the user was using column filters. @Naros was the team aware of this nuance?

I haven't looked too deep into how to get it working again but I imagine it'd take some time for me to gain the confidence to make a meaningful change. Im happy to take suggestions from someone who has more knowledge of the codebase.

@nathan-smit-1
Copy link
Contributor

nathan-smit-1 commented Nov 24, 2024

hey @wukachn, I've been tracking this issue as it relates to a problem we had in Prod for Mysql. I think you may be right and that incremental snapshot has possibly just never worked with a column filter applied for some of the sources. As far as I can tell there isn't a test in place for incremental snapshots with column filters applied.

I will ask about it in Zulip as well as I guess that would be unrelated to the specific issue of handling invisible columns but if I set column.include.list to include a subset of columns for my table and then run an incremental or blocking snapshot I get this warning: {"timestamp":"2024-11-24T13:36:04.785293758+02:00","sequence":991,"loggerClassName":"org.slf4j.impl.Slf4jLogger","loggerName":"io.debezium.config.CommonConnectorConfig","level":"WARN","message":"The signal event 'Struct{}' should have 3 fields but has 0"

This is coming from the below code it seems in CommonConnectorConfig. It would be cool if you could sanity check if you see the same behaviour in 3.0.1 for sql server. I'm also not sure why that would cause the schema change to time out though which is what I saw when I ran tests using your code.

Edit: I believe your Oracle test passed. I wonder if this relates to sources that use db.schema.table for snapshots like SQL server and oracle Vs sources that use just database.table like Mysql

 public Optional<String[]> parseSignallingMessage(Struct value, String fieldName) {
        final Struct event = value.getStruct(fieldName);
        if (event == null) {
            LOGGER.warn("Field {} part of signal '{}' is missing", fieldName, value);
            return Optional.empty();
        }
        List<org.apache.kafka.connect.data.Field> fields = event.schema().fields();
        if (fields.size() != 3) {
            LOGGER.warn("The signal event '{}' should have 3 fields but has {}", event, fields.size());
            return Optional.empty();
        }
        return Optional.of(new String[]{
                event.getString(fields.get(0).name()),
                event.getString(fields.get(1).name()),
                event.getString(fields.get(2).name())
        });
    }

@mfvitale
Copy link
Member

mfvitale commented Nov 25, 2024

Looks like the initial change has made MySQL schema changes not supported at all during an incremental snapshot. If thats true then schema changes must have never been supported during a MySQL incremental snapshot if the user was using column filters. @Naros was the team aware of this nuance?

I tried to set the column filters in the schemaChanges test and it works with the old version.

With your changes it works if you specify the column filters and not work if you not specify it.

The scenario is

  1. The table has initially two column: a,b
  2. Some record inserted
  3. Then the table will be modified with a third column: c
  4. Some record will be inserted
  5. The column c will be removed
  6. Some record inserted

So during the incremental snapshot can happen that when it is processing some chunk an the last table definition was with the column c but then the table has no more the c column.

This before was not happening since the * always match the current table columns.

io.debezium.DebeziumException: Database error while executing incremental snapshot for table 'CollectionId{id=incremental_snapshot-test_1hbn78h.a, additionalCondition=, surrogateKey=}'
	at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:345)
	at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.closeWindow(AbstractIncrementalSnapshotChangeEventSource.java:114)
	at io.debezium.pipeline.signal.actions.snapshotting.CloseIncrementalSnapshotWindow.arrived(CloseIncrementalSnapshotWindow.java:27)
	at io.debezium.pipeline.signal.SignalProcessor.processSignal(SignalProcessor.java:191)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:722)
	at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
	at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
	at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
	at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
	at io.debezium.pipeline.signal.SignalProcessor.lambda$processSourceSignal$4(SignalProcessor.java:155)
	at io.debezium.pipeline.signal.SignalProcessor.executeWithSemaphore(SignalProcessor.java:165)
	at io.debezium.pipeline.signal.SignalProcessor.processSourceSignal(SignalProcessor.java:149)
	at io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:294)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:79)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:47)
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:275)
	at io.debezium.connector.binlog.BinlogStreamingChangeEventSource.lambda$handleInsert$27(BinlogStreamingChangeEventSource.java:835)
	at io.debezium.connector.binlog.BinlogStreamingChangeEventSource.handleChange(BinlogStreamingChangeEventSource.java:1104)
	at io.debezium.connector.binlog.BinlogStreamingChangeEventSource.handleInsert(BinlogStreamingChangeEventSource.java:832)
	at io.debezium.connector.binlog.BinlogStreamingChangeEventSource.lambda$execute$8(BinlogStreamingChangeEventSource.java:184)
	at io.debezium.connector.binlog.BinlogStreamingChangeEventSource.handleEvent(BinlogStreamingChangeEventSource.java:571)
	at io.debezium.connector.binlog.EventBuffer.completeTransaction(EventBuffer.java:233)
	at io.debezium.connector.binlog.EventBuffer.add(EventBuffer.java:121)
	at io.debezium.connector.binlog.BinlogStreamingChangeEventSource.lambda$execute$18(BinlogStreamingChangeEventSource.java:213)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1281)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1103)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:657)
	at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:959)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: io.debezium.DebeziumException: Snapshotting of table incremental_snapshot-test_1hbn78h.a failed
	at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readSchema(AbstractIncrementalSnapshotChangeEventSource.java:413)
	at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.verifySchemaUnchanged(AbstractIncrementalSnapshotChangeEventSource.java:397)
	at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.schemaHistoryIsUpToDate(AbstractIncrementalSnapshotChangeEventSource.java:389)
	at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:266)
	... 36 common frames omitted
Caused by: java.sql.SQLSyntaxErrorException: Unknown column 'c' in 'field list'
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:112)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:114)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:988)
	at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1056)
	at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readSchema(AbstractIncrementalSnapshotChangeEventSource.java:409)
	... 39 common frames omitted

Or can happen this

org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
2024-11-20T08:38:00.5580287Z 	at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:290)
2024-11-20T08:38:00.5581399Z 	at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:319)
2024-11-20T08:38:00.5582483Z 	at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
2024-11-20T08:38:00.5583404Z 	at io.debezium.relational.RelationalChangeRecordEmitter.emitReadRecord(RelationalChangeRecordEmitter.java:87)
2024-11-20T08:38:00.5584482Z 	at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:50)
2024-11-20T08:38:00.5585963Z 	at io.debezium.pipeline.EventDispatcher.dispatchSnapshotEvent(EventDispatcher.java:218)
2024-11-20T08:38:00.5587258Z 	at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.sendEvent(AbstractIncrementalSnapshotChangeEventSource.java:179)
2024-11-20T08:38:00.5589404Z 	at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.sendWindowEvents(AbstractIncrementalSnapshotChangeEventSource.java:170)
2024-11-20T08:38:00.5591589Z 	at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.closeWindow(AbstractIncrementalSnapshotChangeEventSource.java:113)
2024-11-20T08:38:00.5593321Z 	at io.debezium.pipeline.signal.actions.snapshotting.CloseIncrementalSnapshotWindow.arrived(CloseIncrementalSnapshotWindow.java:27)

Record will be emitted with new column before the schema event has been processed.

What I think is wrong here is that we are using the select statement for chunk query also for read the table in readSchema.

@Naros @jpechane any idea?

@nathan-smit-1
Copy link
Contributor

@mfvitale I asked what I guess is a related question here: https://debezium.zulipchat.com/#narrow/channel/348104-community-mysql-mariadb/topic/Expected.20behaviour.20of.20column.2Einclude.2Elist.20for.20multiple.20table

Basically just trying to understand the intended behaviour of the column filter as it appears to apply globally even to the debezium_signal table (i.e. in RelationalDatabaseSchema it excludes the signal table columns and you can't then read signals from it) and so if you specify include columns you need to make sure to include every column across all of your tables which is is not ideal

@wukachn
Copy link
Contributor Author

wukachn commented Nov 25, 2024

@mfvitale If you have a column.exclude.list that doesn't have any excluded columns in the table which is experiencing the schema change, the schemaChanges test will fail.

Using the column.include.list would mean that the test passes because you are already limiting the connector to look at a subset of columns.


From looking at the code this weekend, I came to the conclusion that a change was needed to readSchema. What that looks like...I haven't looked into it 👍

@nathan-smit-1
Copy link
Contributor

What I think is wrong here is that we are using the select statement for chunk query also for read the table in readSchema.

Hey @mfvitale, I just wanted to follow up on what you were saying above as I've been tracking this issue. Is there any downside to just having always having the select statement for readSchema be select * from table with conditions?

Not sure what the least hacky way to do that would be though. Maybe a new method so instead of calling buildChunkQuery in readSchema you call a new buildSchemaQuery method or something like that so you can have something more similar to the old logic where you select * unless there is include/exclude column list?

@Naros
Copy link
Member

Naros commented Dec 10, 2024

So during the incremental snapshot can happen that when it is processing some chunk an the last table definition was with the column c but then the table has no more the c column.

There is a window between the schema change validation step and the execution of the query where I could imagine this error happening, and we should likely guard against this race condition. The question is whether or not we can efficiently guard against this 🤔 .

What I think is wrong here is that we are using the select statement for chunk query also for read the table in readSchema.

I believe we need both, its what we do for the initial snapshots I do believe, right?

@Naros
Copy link
Member

Naros commented Dec 10, 2024

Basically just trying to understand the intended behaviour of the column filter as it appears to apply globally even to the debezium_signal table (i.e. in RelationalDatabaseSchema it excludes the signal table columns and you can't then read signals from it) and so if you specify include columns you need to make sure to include every column across all of your tables which is is not ideal

In that case why not use column.exclude.list and remove the opposing data set @nathan-smit-1 ?

@Naros
Copy link
Member

Naros commented Dec 10, 2024

Not sure what the least hacky way to do that would be though. Maybe a new method so instead of calling buildChunkQuery in readSchema you call a new buildSchemaQuery method or something like that so you can have something more similar to the old logic where you select * unless there is include/exclude column list?

I believe the idea @nathan-smit-1 was to always apply a list of columns. For example in the initial snapshot, if we find the user supplies a SELECT * based override or there is no override, we turn that into a SELECT <explicit list of columns> query. It's my understanding that were attempting to mimic that exactly.

@nathan-smit-1
Copy link
Contributor

nathan-smit-1 commented Dec 10, 2024

Not sure what the least hacky way to do that would be though. Maybe a new method so instead of calling buildChunkQuery in readSchema you call a new buildSchemaQuery method or something like that so you can have something more similar to the old logic where you select * unless there is include/exclude column list?

I believe the idea @nathan-smit-1 was to always apply a list of columns. For example in the initial snapshot, if we find the user supplies a SELECT * based override or there is no override, we turn that into a SELECT <explicit list of columns> query. It's my understanding that were attempting to mimic that exactly.

@Naros Apologies, ended up deleting some responses for clarity. As I understand the issue, when a column is dropped - and we explicitly list out the columns - readSchema fails because it does select a, b, c from table to generate the schema but [c] no longer exists. So the process to verify if the schema is changed is itself failing due to the changed schema, if that makes sense.

I was just thinking now though that I recently added readSchemaForTable as part of DBZ-4903 and I'm wondering now if that can't be adapted to do the schema read instead? I'm busy running some tests at the moment.

EDIT: So the above doesn't work because the schema generated by jdbcConnection.readSchema is subtly different from the schema from context.getSchema()

@mfvitale
Copy link
Member

What I think is wrong here is that we are using the select statement for chunk query also for read the table in readSchema.

I believe we need both, its what we do for the initial snapshots I do believe, right?

The main difference with the initial snapshot is that we lock tables while reading the schema.

@nathan-smit-1
Copy link
Contributor

@mfvitale @Naros I don't wanna monopolise discussion to much on this as it's not my pull request, but FWIW, I did find that having a fallback like the below in the case of sql exception did pass the Mysql tests at least and does then allow snapshots on invisible columns. The below is also kinda what I was referring to as a "hacky" solution though (i.e. checking the current schema in event of sql exception). Definitely a problem that is much trickier than it initially seemed.

  private Table readSchema() {
       final String selectStatement = chunkQueryBuilder.buildChunkQuery(context, currentTable, 0, Optional.empty());
       LOGGER.debug("Reading schema for table '{}' using select statement: '{}'", currentTable.id(), selectStatement);

       try (PreparedStatement statement = chunkQueryBuilder.readTableChunkStatement(context, currentTable, selectStatement);
               ResultSet rs = statement.executeQuery()) {
           return getTable(rs);
       }
       catch (SQLException initialException) {
           LOGGER.warn("Initial schema read failed. Attempting alternative schema retrieval.", initialException);

           try {
               // Try reading schema using the alternative method
               Table alternativeTable = readSchemaForTable(currentTable.id());

               // Retry the original query with the alternative table
               final String retrySelectStatement = chunkQueryBuilder.buildChunkQuery(context, alternativeTable, 0, Optional.empty());
               try (PreparedStatement retryStatement = chunkQueryBuilder.readTableChunkStatement(context, alternativeTable, retrySelectStatement);
                       ResultSet retryRs = retryStatement.executeQuery()) {
                   return getTable(retryRs);
               }
           }
           catch (SQLException secondaryException) {
               // If both attempts fail, throw a comprehensive exception
               LOGGER.error("Failed to read schema for table '{}' after multiple attempts", currentTable.id(), secondaryException);
               throw new DebeziumException("Snapshotting of table " + currentTable.id() + " failed after multiple attempts", initialException);
           }
       }
   }

@mfvitale mfvitale added the blocked/needs more thinking Pull requests that require some more consideration label Dec 13, 2024
@mfvitale
Copy link
Member

@mfvitale @Naros I don't wanna monopolise discussion to much on this as it's not my pull request, but FWIW, I did find that having a fallback like the below in the case of sql exception did pass the Mysql tests at least and does then allow snapshots on invisible columns. The below is also kinda what I was referring to as a "hacky" solution though (i.e. checking the current schema in event of sql exception). Definitely a problem that is much trickier than it initially seemed.

  private Table readSchema() {
       final String selectStatement = chunkQueryBuilder.buildChunkQuery(context, currentTable, 0, Optional.empty());
       LOGGER.debug("Reading schema for table '{}' using select statement: '{}'", currentTable.id(), selectStatement);

       try (PreparedStatement statement = chunkQueryBuilder.readTableChunkStatement(context, currentTable, selectStatement);
               ResultSet rs = statement.executeQuery()) {
           return getTable(rs);
       }
       catch (SQLException initialException) {
           LOGGER.warn("Initial schema read failed. Attempting alternative schema retrieval.", initialException);

           try {
               // Try reading schema using the alternative method
               Table alternativeTable = readSchemaForTable(currentTable.id());

               // Retry the original query with the alternative table
               final String retrySelectStatement = chunkQueryBuilder.buildChunkQuery(context, alternativeTable, 0, Optional.empty());
               try (PreparedStatement retryStatement = chunkQueryBuilder.readTableChunkStatement(context, alternativeTable, retrySelectStatement);
                       ResultSet retryRs = retryStatement.executeQuery()) {
                   return getTable(retryRs);
               }
           }
           catch (SQLException secondaryException) {
               // If both attempts fail, throw a comprehensive exception
               LOGGER.error("Failed to read schema for table '{}' after multiple attempts", currentTable.id(), secondaryException);
               throw new DebeziumException("Snapshotting of table " + currentTable.id() + " failed after multiple attempts", initialException);
           }
       }
   }

@nathan-smit-1 the problem with the hack is that you can effectively have a table schema without the hidden column and then read the data with the hidden column.

I added needs more thinking as the solution will not be easy here. If you want to propose your way, feel free to work on it.

@nathan-smit-1
Copy link
Contributor

@mfvitale that makes sense. Just to put another idea out there: What if this was configurable such that you could choose to have Debezium list the column but accept the risk that you can't have schema changes during the snapshot? So the equivalent of setting snapshot.locking.mode to none

@mfvitale
Copy link
Member

@mfvitale that makes sense. Just to put another idea out there: What if this was configurable such that you could choose to have Debezium list the column but accept the risk that you can't have schema changes during the snapshot? So the equivalent of setting snapshot.locking.mode to none

@nathan-smit-1 I think this is not going to works since its what is happening in the tests. Schema must match otherwise you will get Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema

@Naros
Copy link
Member

Naros commented Jan 23, 2025

My apologies for circling back to this relatively late @mfvitale and @nathan-smit-1.

So I came across a similar situation with Oracle recently during streaming where any GENERATED ALWAYS columns are excluded and not part of the transaction log's change record; however, the relational table metadata tracks such columns.

This makes me wonder whether it makes more sense to consider whether hidden/generated columns should be part of the change event payload, particularly given that they're often not easily obtainable or even recorded in some vendor transaction logs.

Perhaps once we introduce connector-specific relational model implementations, that might help here.

@mfvitale
Copy link
Member

So I came across a similar situation with Oracle recently during streaming where any GENERATED ALWAYS columns are excluded and not part of the transaction log's change record; however, the relational table metadata tracks such columns.

Well this is interesting. It would be good if we can put down somewhere (DDD?) all this findings so we can also discuss better on the solution.

This makes me wonder whether it makes more sense to consider whether hidden/generated columns should be part of the change event payload, particularly given that they're often not easily obtainable or even recorded in some vendor transaction logs.

In case of the generated, that are not in the transaction logs, it sounds good but I have some doubts on the hidden one. Think in uses case where you just want to do same database replication.

Perhaps once we introduce connector-specific relational model implementations, that might help here.

Also here, do you think is there any change to in someway exclude or optionally exclude those columns while we get the table metadata?

@Naros
Copy link
Member

Naros commented Jan 24, 2025

In case of the generated, that are not in the transaction logs, it sounds good but I have some doubts on the hidden one. Think in uses case where you just want to do same database replication.

@mfvitale, absolutely, I think this highlights the fact it could be useful to introduce a visibility attribute or mechanism for tracking column visibility in the relational model, since I suspect other databases are similar to Oracle where virtual/generated is tracked independently of column's visibility setting for generalized queries.

do you think is there any change to in someway exclude or optionally exclude those columns while we get the table metadata?

We could, but column position details would also need to change with what I suspect may be a database-specific hook. In Oracle for example, the driver by default provides generated columns first, but is this somehow influenced by NULL ordering, where some databases you can set NULLs to be first or last.

Whatever we do, we'd need to take it connector by connector. For MariaDB, MySQL, and Oracle, combining a code change in JdbcConnection and updating the ANTLR grammar where needed to capture & track this detail would probably suffice. I can't speak for SQL Server as I don't know that area of the code well. For PostgreSQL, we'd face the same dilemma we do today with optionality, using an out-of-band query during relational messages to refresh the schema, which may or may not find the table if it's been dropped.

One big open question is whether or not it would be possible to make such a change for existing connectors. Particularly when we rehydrate the schema history, etc. Is all the information truly there or not.

@wukachn
Copy link
Contributor Author

wukachn commented Sep 1, 2025

Closing due to inactivity.

@wukachn wukachn closed this Sep 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

blocked/needs more thinking Pull requests that require some more consideration

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants