Skip to content

Commit 2d5c240

Browse files
committed
DBZ-7996 Add AbstractChangeEventSink with shared logic for JDBC sink and MongoDB sink connectors
closes to https://issues.redhat.com/browse/DBZ-7996
1 parent 4d148d1 commit 2d5c240

File tree

100 files changed

+2149
-1832
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

100 files changed

+2149
-1832
lines changed

.github/workflows/debezium-workflow-pr.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ jobs:
7474
uses: tj-actions/[email protected]
7575
with:
7676
files: |
77+
debezium-sink/**
7778
debezium-connector-mongodb/**
7879
7980
- name: Get modified files (MySQL)
@@ -118,6 +119,7 @@ jobs:
118119
uses: tj-actions/[email protected]
119120
with:
120121
files: |
122+
debezium-sink/**
121123
debezium-connector-jdbc/**
122124
123125
- name: Get modified files (Quarkus Outbox)

debezium-api/src/main/java/io/debezium/engine/StopEngineException.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
*/
66
package io.debezium.engine;
77

8+
import java.io.Serial;
9+
810
import io.debezium.DebeziumException;
911

1012
/**
@@ -16,6 +18,7 @@
1618
*/
1719
public class StopEngineException extends DebeziumException {
1820

21+
@Serial
1922
private static final long serialVersionUID = 1L;
2023

2124
public StopEngineException(String msg) {

debezium-bom/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,11 @@
621621
<artifactId>debezium-core</artifactId>
622622
<version>${project.version}</version>
623623
</dependency>
624+
<dependency>
625+
<groupId>io.debezium</groupId>
626+
<artifactId>debezium-sink</artifactId>
627+
<version>${project.version}</version>
628+
</dependency>
624629
<dependency>
625630
<groupId>io.debezium</groupId>
626631
<artifactId>debezium-storage-jdbc</artifactId>

debezium-connector-jdbc/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@
4242
<groupId>io.debezium</groupId>
4343
<artifactId>debezium-core</artifactId>
4444
</dependency>
45+
<dependency>
46+
<groupId>io.debezium</groupId>
47+
<artifactId>debezium-sink</artifactId>
48+
</dependency>
4549

4650
<!-- Kafka-->
4751
<dependency>

debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/Buffer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@
1515
public interface Buffer {
1616

1717
/**
18-
* to add a {@link SinkRecordDescriptor} to the internal buffer and
18+
* to add a {@link JdbcSinkRecord} to the internal buffer and
1919
* call the {@link Buffer#flush()} when buffer size >= {@link JdbcSinkConnectorConfig#getBatchSize()}
20-
* @param recordDescriptor the Sink record descriptor
20+
* @param record the Debezium sink record
2121
* @return the buffer records
2222
*/
23-
List<SinkRecordDescriptor> add(SinkRecordDescriptor recordDescriptor);
23+
List<JdbcSinkRecord> add(JdbcSinkRecord record);
2424

2525
/**
2626
* to clear and flush the internal buffer
27-
* @return {@link SinkRecordDescriptor} the flushed buffer records.
27+
* @return {@link JdbcSinkRecord} the flushed buffer records.
2828
*/
29-
List<SinkRecordDescriptor> flush();
29+
List<JdbcSinkRecord> flush();
3030

3131
/**
3232
* to check whether buffer is empty or not.

debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java

Lines changed: 91 additions & 145 deletions
Large diffs are not rendered by default.
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/*
2+
* Copyright Debezium Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
5+
*/
6+
package io.debezium.connector.jdbc;
7+
8+
import java.util.ArrayList;
9+
import java.util.LinkedHashMap;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.Objects;
13+
import java.util.Set;
14+
import java.util.stream.Stream;
15+
16+
import org.apache.kafka.connect.data.Field;
17+
import org.apache.kafka.connect.data.Schema;
18+
import org.apache.kafka.connect.data.SchemaBuilder;
19+
import org.apache.kafka.connect.data.Struct;
20+
import org.apache.kafka.connect.errors.ConnectException;
21+
import org.apache.kafka.connect.header.Header;
22+
import org.apache.kafka.connect.sink.SinkRecord;
23+
24+
import io.debezium.annotation.Immutable;
25+
import io.debezium.bindings.kafka.KafkaDebeziumSinkRecord;
26+
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
27+
import io.debezium.data.Envelope;
28+
import io.debezium.sink.SinkConnectorConfig.PrimaryKeyMode;
29+
import io.debezium.sink.filter.FieldFilterFactory.FieldNameFilter;
30+
31+
/**
32+
* An immutable representation of a {@link SinkRecord}.
33+
*
34+
* @author Chris Cranford
35+
* @author rk3rn3r
36+
*/
37+
@Immutable
38+
public class JdbcKafkaSinkRecord extends KafkaDebeziumSinkRecord implements JdbcSinkRecord {
39+
40+
private final PrimaryKeyMode primaryKeyMode;
41+
private final Set<String> primaryKeyFields;
42+
private final FieldNameFilter fieldsFilter;
43+
private final DatabaseDialect dialect;
44+
private final List<String> keyFieldNames = new ArrayList<>();
45+
private final List<String> nonKeyFieldNames = new ArrayList<>();
46+
private final Map<String, FieldDescriptor> allFields = new LinkedHashMap<>();
47+
48+
public JdbcKafkaSinkRecord(SinkRecord record, PrimaryKeyMode primaryKeyMode, Set<String> primaryKeyFields, FieldNameFilter fieldsFilter, DatabaseDialect dialect) {
49+
super(record);
50+
51+
Objects.requireNonNull(primaryKeyMode, "The primary key mode must be provided.");
52+
Objects.requireNonNull(record, "The sink record must be provided.");
53+
54+
this.primaryKeyMode = primaryKeyMode;
55+
this.primaryKeyFields = primaryKeyFields;
56+
this.fieldsFilter = fieldsFilter;
57+
this.dialect = dialect;
58+
59+
final var flattened = isFlattened();
60+
final boolean truncated = !flattened && isTruncate();
61+
if (!truncated) {
62+
readSinkRecordKeyData(flattened);
63+
readSinkRecordNonKeyData(flattened);
64+
}
65+
}
66+
67+
public List<String> keyFieldNames() {
68+
return keyFieldNames;
69+
}
70+
71+
public List<String> getNonKeyFieldNames() {
72+
return nonKeyFieldNames;
73+
}
74+
75+
public Map<String, FieldDescriptor> allFields() {
76+
return allFields;
77+
}
78+
79+
private void readSinkRecordKeyData(boolean flattened) {
80+
switch (primaryKeyMode) {
81+
case NONE:
82+
// does nothing
83+
break;
84+
case KAFKA:
85+
applyKafkaCoordinatesAsPrimaryKey();
86+
break;
87+
case RECORD_KEY:
88+
applyRecordKeyAsPrimaryKey();
89+
break;
90+
case RECORD_HEADER:
91+
applyRecordHeaderAsPrimaryKey();
92+
break;
93+
case RECORD_VALUE:
94+
applyRecordValueAsPrimaryKey(flattened);
95+
break;
96+
default:
97+
throw new ConnectException("Unexpected primary key mode: " + primaryKeyMode);
98+
}
99+
}
100+
101+
private void readSinkRecordNonKeyData(boolean flattened) {
102+
final Schema valueSchema = valueSchema();
103+
if (valueSchema != null) {
104+
if (flattened) {
105+
// In a flattened event type, it's safe to read the field names directly
106+
// from the schema as this isn't a complex Debezium message type.
107+
applyNonKeyFields(topicName(), valueSchema);
108+
}
109+
else {
110+
// In a non-flattened event type, this is a complex Debezium type.
111+
// We want to source the field names strictly from the 'after' block.
112+
final Field after = valueSchema.field(Envelope.FieldName.AFTER);
113+
if (after == null) {
114+
throw new ConnectException("Received an unexpected message type that does not have an 'after' Debezium block");
115+
}
116+
applyNonKeyFields(topicName(), after.schema());
117+
}
118+
}
119+
}
120+
121+
private static final String CONNECT_TOPIC = "__connect_topic";
122+
private static final String CONNECT_PARTITION = "__connect_partition";
123+
private static final String CONNECT_OFFSET = "__connect_offset";
124+
125+
private void applyKafkaCoordinatesAsPrimaryKey() {
126+
// CONNECT_TOPIC
127+
keyFieldNames.add(CONNECT_TOPIC);
128+
allFields.put(CONNECT_TOPIC, new FieldDescriptor(Schema.STRING_SCHEMA, CONNECT_TOPIC, true, dialect));
129+
130+
// CONNECT_PARTITION
131+
keyFieldNames.add(CONNECT_PARTITION);
132+
allFields.put(CONNECT_PARTITION, new FieldDescriptor(Schema.INT32_SCHEMA, CONNECT_PARTITION, true, dialect));
133+
134+
// CONNECT_OFFSET
135+
keyFieldNames.add(CONNECT_OFFSET);
136+
allFields.put(CONNECT_OFFSET, new FieldDescriptor(Schema.INT64_SCHEMA, CONNECT_OFFSET, true, dialect));
137+
}
138+
139+
private void applyRecordKeyAsPrimaryKey() {
140+
final Schema keySchema = keySchema();
141+
if (keySchema == null) {
142+
throw new ConnectException("Configured primary key mode 'record_key' cannot have null schema");
143+
}
144+
else if (keySchema.type().isPrimitive()) {
145+
applyPrimitiveRecordKeyAsPrimaryKey(keySchema);
146+
}
147+
else if (Schema.Type.STRUCT.equals(keySchema.type())) {
148+
applyRecordKeyAsPrimaryKey(topicName(), keySchema);
149+
}
150+
else {
151+
throw new ConnectException("An unsupported record key schema type detected: " + keySchema.type());
152+
}
153+
}
154+
155+
private void applyRecordHeaderAsPrimaryKey() {
156+
if (originalKafkaRecord.headers() == null || originalKafkaRecord.headers().isEmpty()) {
157+
throw new ConnectException("Configured primary key mode 'record_header' cannot have null or empty schema");
158+
}
159+
160+
final SchemaBuilder headerSchemaBuilder = SchemaBuilder.struct();
161+
originalKafkaRecord.headers().forEach((Header header) -> headerSchemaBuilder.field(header.key(), header.schema()));
162+
final Schema headerSchema = headerSchemaBuilder.build();
163+
applyRecordKeyAsPrimaryKey(topicName(), headerSchema);
164+
165+
}
166+
167+
private void applyRecordValueAsPrimaryKey(boolean flattened) {
168+
169+
final Schema valueSchema = valueSchema();
170+
if (valueSchema == null) {
171+
throw new ConnectException("Configured primary key mode 'record_value' cannot have null schema");
172+
}
173+
174+
Stream<Field> recordFields;
175+
if (flattened) {
176+
recordFields = valueSchema().fields().stream();
177+
}
178+
else {
179+
recordFields = ((Struct) value()).getStruct(Envelope.FieldName.AFTER).schema().fields().stream();
180+
}
181+
182+
if (!primaryKeyFields.isEmpty()) {
183+
recordFields = recordFields.filter(field -> primaryKeyFields.contains(field.name()));
184+
}
185+
186+
recordFields.forEach(field -> addKeyField(topicName(), field));
187+
}
188+
189+
private void applyPrimitiveRecordKeyAsPrimaryKey(Schema keySchema) {
190+
if (primaryKeyFields.isEmpty()) {
191+
throw new ConnectException("The " + JdbcSinkConnectorConfig.PRIMARY_KEY_FIELDS +
192+
" configuration must be specified when using a primitive key.");
193+
}
194+
addKeyField(primaryKeyFields.iterator().next(), keySchema);
195+
}
196+
197+
private void applyRecordKeyAsPrimaryKey(String topic, Schema keySchema) {
198+
for (Field field : keySchema.fields()) {
199+
if (primaryKeyFields.isEmpty() || primaryKeyFields.contains(field.name())) {
200+
addKeyField(topic, field);
201+
}
202+
}
203+
}
204+
205+
private void addKeyField(String topic, Field field) {
206+
if (fieldsFilter.matches(topic, field.name())) {
207+
addKeyField(field.name(), field.schema());
208+
}
209+
}
210+
211+
private void addKeyField(String name, Schema schema) {
212+
FieldDescriptor fieldDescriptor = new FieldDescriptor(schema, name, true, dialect);
213+
keyFieldNames.add(fieldDescriptor.getName());
214+
allFields.put(fieldDescriptor.getName(), fieldDescriptor);
215+
}
216+
217+
private void applyNonKeyFields(String topic, Schema schema) {
218+
for (Field field : schema.fields()) {
219+
if (!keyFieldNames.contains(field.name())) {
220+
if (fieldsFilter.matches(topic, field.name())) {
221+
applyNonKeyField(field.name(), field.schema());
222+
}
223+
}
224+
}
225+
}
226+
227+
private void applyNonKeyField(String name, Schema schema) {
228+
FieldDescriptor fieldDescriptor = new FieldDescriptor(schema, name, false, dialect);
229+
nonKeyFieldNames.add(fieldDescriptor.getName());
230+
allFields.put(fieldDescriptor.getName(), fieldDescriptor);
231+
}
232+
233+
}

0 commit comments

Comments
 (0)