DBZ-6984 Add multi-task support for MongoDB connectors#5925
DBZ-6984 Add multi-task support for MongoDB connectors#5925adityanair-stripe wants to merge 2 commits intodebezium:mainfrom
Conversation
| this.signalEnabledChannels = getSignalEnabledChannels(config); | ||
| this.skippedOperations = determineSkippedOperations(config); | ||
| this.taskId = config.getString(TASK_ID); | ||
| this.maxTasks = config.getInteger(TASKS_MAX, 1); |
There was a problem hiding this comment.
Is this really guranteed that Kafka Connect will always propagate the connector infra level config to the connector configuration?
There was a problem hiding this comment.
I don't think there's any documented guarantee. It is the actual behavior though: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2106
| "for data change, schema change, transaction, heartbeat event etc.") | ||
| .withDefault(DefaultTopicNamingStrategy.class.getName()); | ||
|
|
||
| public static final Field MONGODB_MULTI_TASK_ENABLED = Field.create("mongodb.multi.task.enabled") |
There was a problem hiding this comment.
Do we really need to have such a config option? IMHO just the setting of tasks.max > 1 should trigger the behaviour.
But as a safety concern (given we are going to merge it to stable branch) I'd keep this yet use Field.createInternal. It would be false by default in 3.0 and true in 3.1. The option could be then removed completely.
There was a problem hiding this comment.
I would definitively keep an option that specifically enables THIS TYPE of multitask As in the future we might want to do multi tasking based on e.g. databases.
There was a problem hiding this comment.
Agree with @jcechace on this :) There is also an existing replset based multi-tasking for Mongo, so we wanted to be very explicit about having a flag to turn the feature on/off.
There was a problem hiding this comment.
@ning-stripe RS-based one was removed as it was highly problematic
| .withDefault(DefaultTopicNamingStrategy.class.getName()); | ||
|
|
||
| public static final Field MONGODB_MULTI_TASK_ENABLED = Field.create("mongodb.multi.task.enabled") | ||
| .withDisplayName("Enable Mongo DB multi-task") |
There was a problem hiding this comment.
Here and anywhere else, it is MongoDB not Mongo DB
| .withValidation(Field::isInteger) | ||
| .withDescription("Define multi-task generation to be enabled on MongoDB instance"); | ||
|
|
||
| public static final Field MONGODB_MULTI_TASK_PREV_TASKS = Field.create("mongodb.multi.prev.tasks.max") |
There was a problem hiding this comment.
Are the prev and even gen options really necessary? Can't we store the number of tasksin the offsets and detect the change based on comparison of value from offests with the one configured in connector config?
There was a problem hiding this comment.
prev_gen is here because we might not want to start at generation - 1. Suppose there was some problem with the current generation, we might want to start a new generation starting where the previous generation left off. Ex: generation 0 is fine, generation 1 is problematic, so we start generation 2 from generation 0's offsets. For this to work gen and prev_gen need to be explicitly defined.
Moving gen into the offset creates an issue in the above scenario because if we don't include the generation in the partition then we won't be able to get an older generation's offsets.
| if (this.multiTaskEnabled) { | ||
| partition.put(TASK_ID_KEY, String.valueOf(this.taskId)); | ||
| partition.put(MULTI_TASK_GEN_KEY, String.valueOf(this.multiTaskGen)); | ||
| partition.put(TASK_COUNT_KEY, String.valueOf(taskCount)); |
There was a problem hiding this comment.
Why do we need to use taks count?
There was a problem hiding this comment.
We included task count in the partition to guard against accidental misconfiguration. Ex: if generation 0 has 3 tasks, and generation 1 is configured with prev_tasks=1 the connector will fail to find any offsets and exit instead of silently using the incorrect configuration.
|
I'm going over this. However before this gets merged I would very much appreciate at least some basic docs describing how the system works. |
|
|
||
| import io.debezium.data.Envelope; | ||
|
|
||
| public class MongoDbMultiTaskConnectorIT extends AbstractMongoConnectorIT { |
There was a problem hiding this comment.
Amazing work! The tests look so much more thorough and less hacky :D
|
@adityanair-stripe I'm slowly crunching through the PR. Would it be possible to schedule a call for the next week to discuss the concept? I know we touched on it sometime last year but a lot of things changed since then -- for one the buffering cursor did not exist. The way I see this being helpful is only when there is a significant write into a single collection and Debezium is not able to keep up in single task. If the writes were spread across databases / collections then multitasking based on databases/collections seems to make more sense. If the load is really on single collection, wouldn't it then make more sense to introduce paralel event processing into a single task? I'm trying to understand where exactly the bottleneck was and whether this is addressing it at the right spot -- as although this is likely a feasible solution, it also adds a lot of complexity to the code. |
| * | ||
| * @author Anthony Noakes | ||
| */ | ||
| public class MultiTaskOffsetHandler { |
There was a problem hiding this comment.
The methods here definitively need javadocs.
There was a problem hiding this comment.
I would probably consider a different name. E.g. "TimeIntervalHandler" as these don't really seem to be offsets (in context of Kafka).
There was a problem hiding this comment.
sure, I'll call it MultiTaskWindowHandler
| do { | ||
| try (MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor = stream.cursor()) { | ||
| cursorRef.compareAndSet(null, cursor); | ||
| running.set(true); | ||
| noMessageIterations = 0; | ||
| fetchEvents(cursor); | ||
| stream = streamManager.updateStream(stream); | ||
| } | ||
| catch (InterruptedException e) { | ||
| LOGGER.error("Fetcher thread interrupted", e); | ||
| Thread.currentThread().interrupt(); | ||
| throw new DebeziumException("Fetcher thread interrupted", e); | ||
| } | ||
| catch (Throwable e) { | ||
| error.set(e); | ||
| LOGGER.error("Fetcher thread has failed", e); | ||
| close(); | ||
| } | ||
| finally { | ||
| cursorRef.set(null); | ||
| } | ||
| } while (isRunning()); |
There was a problem hiding this comment.
Why is this needed? Having a loop inside loop (since the fetchEvent call already does this) doesn't seem right. I also wonder whether it can pose a synchronisation issue and accidentally restart the streaming.
There was a problem hiding this comment.
The outer loop is to move the multi-task window whenever we finish streaming all records from the current window. Within fetchEvents we call shouldUpdateStream on every new event and break out of the inner loop when it returns true (when we get an event outside the current window). Then we move the window, clean up the cursor, update the stream, and open a new cursor before starting to iterate over events again.
| cursorRef.set(null); | ||
| } | ||
| } while (isRunning()); | ||
| close(); |
There was a problem hiding this comment.
this definitively shouldn't be here.
| try (var cursor = multiTaskEnabled ? BufferingChangeStreamCursor.fromIterable(stream, multiTaskOffsetHandler, taskContext, streamingMetrics, clock).start() | ||
| : BufferingChangeStreamCursor.fromIterable(stream, taskContext, streamingMetrics, clock).start()) { |
There was a problem hiding this comment.
void ternary operator in try with resources please. It's really difficult to read. Passing null argument and redirecting to overload is perfectly fine
| return Module.name(); | ||
| } | ||
|
|
||
| public boolean getMultiTaskEnabled() { |
|
Hi @adityanair-stripe, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key. |
I've collected some of our docs and combined them into this doc. Let me know if there are any other details about the design that you'd like me to elaborate on.
Sent an email, let me know what time works for you.
Yeah, this is a common occurrence for us - one of the goals of this feature is to solve the 'hot collection' issue described here. Distributing events to tasks based on db/collection doesn't really address the issue. |
|
Hi @adityanair-stripe, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key. |
This seems not to be accessible. Can you either make it public or share with me directly? |
|
Hi @adityanair-stripe, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key. |
2 similar comments
|
Hi @adityanair-stripe, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key. |
|
Hi @adityanair-stripe, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key. |
801dc64 to
b38753b
Compare
|
❌ Developer Certificate of Origin (DCO) check failed. Hi @adityanair-stripe, please sign off all commits with: If pull request commits are not signed off, the pull request cannot be merged. For more information about why this is required, please see our blog about contribution requirement changes. |
Signed-off-by: Aditya Nair <[email protected]>
b38753b to
24d07db
Compare
Signed-off-by: Aditya Nair <[email protected]>
24d07db to
e6a4bf4
Compare
https://issues.redhat.com/browse/DBZ-6984