Skip to content

Commit

Permalink
DBZ-2643 Only set filter info in offsets when snapshot.new.tables = p…
Browse files Browse the repository at this point in the history
…arallel
  • Loading branch information
johnjmartin authored and jpechane committed Oct 14, 2020
1 parent 3bd0758 commit 4fe4aed
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,7 @@ else if (!earliestBinlogFilename.endsWith("00001")) {
}
}
else {
if (!source.hasFilterInfo()) {
// if we don't have filter info, then either
// 1. the snapshot was taken in a version of debezium before the filter info was stored in the offsets, or
// 2. this connector previously had no filter information.
// either way, we have to assume that the filter information currently in the config accurately reflects
// the current state of the connector.
source.maybeSetFilterDataFromConfig(config);
}
source.maybeSetFilterDataFromConfig(config);
if (!rowBinlogEnabled) {
throw new ConnectException(
"The MySQL server does not appear to be using a full row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,15 @@ public void maybeSetFilterDataFromConfig(Configuration config) {
MySqlConnectorConfig.SnapshotNewTables.PARALLEL.getValue())) {
setFilterDataFromConfig(config);
}
else {
if (hasFilterInfo()) {
// Connector has filter info but snapshot.new.tables is disabled. Filter info should be unset.
this.databaseIncludeList = null;
this.databaseExcludeList = null;
this.tableIncludeList = null;
this.tableExcludeList = null;
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,29 @@ public void setOffsetFilterFromFilter() {
assertEquals(tableBlacklist, source.getTableExcludeList());
}

@Test
public void shouldRecoverSourceInfoFromOffsetWithoutFilterDataIfSnapshotNewTablesIsOff() {
final String databaseIncludeList = "a,b";
final String tableIncludeList = "c.foo,d.bar,d.baz";
Map<String, String> offset = offset(10, 10);
offset.put(SourceInfo.DATABASE_INCLUDE_LIST_KEY, databaseIncludeList);
offset.put(SourceInfo.TABLE_INCLUDE_LIST_KEY, tableIncludeList);

sourceWith(offset);
assertThat(source.hasFilterInfo()).isTrue();

final Configuration configuration = Configuration.create()
.with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, databaseIncludeList)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList)
.with(MySqlConnectorConfig.SNAPSHOT_NEW_TABLES, MySqlConnectorConfig.SnapshotNewTables.OFF)
.build();
source.maybeSetFilterDataFromConfig(configuration);

assertThat(source.hasFilterInfo()).isFalse();
assertThat(source.getDatabaseIncludeList()).isNull();
assertThat(source.getTableIncludeList()).isNull();
}

@Test
public void shouldStartSourceInfoFromBinlogCoordinatesWithGtidsAndZeroBinlogCoordinates() {
sourceWith(offset(GTID_SET, 0, 0, false));
Expand Down

0 comments on commit 4fe4aed

Please sign in to comment.