Skip to content

DBZ-6984 Add multi-task support for MongoDB connectors#5925

Open
adityanair-stripe wants to merge 2 commits intodebezium:mainfrom
adityanair-stripe:DBZ-6984
Open

DBZ-6984 Add multi-task support for MongoDB connectors#5925
adityanair-stripe wants to merge 2 commits intodebezium:mainfrom
adityanair-stripe:DBZ-6984

Conversation

@adityanair-stripe
Copy link

@jpechane jpechane requested review from ani-sha and jcechace October 8, 2024 06:31
this.signalEnabledChannels = getSignalEnabledChannels(config);
this.skippedOperations = determineSkippedOperations(config);
this.taskId = config.getString(TASK_ID);
this.maxTasks = config.getInteger(TASKS_MAX, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really guranteed that Kafka Connect will always propagate the connector infra level config to the connector configuration?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"for data change, schema change, transaction, heartbeat event etc.")
.withDefault(DefaultTopicNamingStrategy.class.getName());

public static final Field MONGODB_MULTI_TASK_ENABLED = Field.create("mongodb.multi.task.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to have such a config option? IMHO just the setting of tasks.max > 1 should trigger the behaviour.
But as a safety concern (given we are going to merge it to stable branch) I'd keep this yet use Field.createInternal. It would be false by default in 3.0 and true in 3.1. The option could be then removed completely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would definitively keep an option that specifically enables THIS TYPE of multitask As in the future we might want to do multi tasking based on e.g. databases.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with @jcechace on this :) There is also an existing replset based multi-tasking for Mongo, so we wanted to be very explicit about having a flag to turn the feature on/off.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ning-stripe RS-based one was removed as it was highly problematic

.withDefault(DefaultTopicNamingStrategy.class.getName());

public static final Field MONGODB_MULTI_TASK_ENABLED = Field.create("mongodb.multi.task.enabled")
.withDisplayName("Enable Mongo DB multi-task")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and anywhere else, it is MongoDB not Mongo DB

.withValidation(Field::isInteger)
.withDescription("Define multi-task generation to be enabled on MongoDB instance");

public static final Field MONGODB_MULTI_TASK_PREV_TASKS = Field.create("mongodb.multi.prev.tasks.max")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the prev and even gen options really necessary? Can't we store the number of tasksin the offsets and detect the change based on comparison of value from offests with the one configured in connector config?

Copy link
Author

@adityanair-stripe adityanair-stripe Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prev_gen is here because we might not want to start at generation - 1. Suppose there was some problem with the current generation, we might want to start a new generation starting where the previous generation left off. Ex: generation 0 is fine, generation 1 is problematic, so we start generation 2 from generation 0's offsets. For this to work gen and prev_gen need to be explicitly defined.

Moving gen into the offset creates an issue in the above scenario because if we don't include the generation in the partition then we won't be able to get an older generation's offsets.

if (this.multiTaskEnabled) {
partition.put(TASK_ID_KEY, String.valueOf(this.taskId));
partition.put(MULTI_TASK_GEN_KEY, String.valueOf(this.multiTaskGen));
partition.put(TASK_COUNT_KEY, String.valueOf(taskCount));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to use taks count?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We included task count in the partition to guard against accidental misconfiguration. Ex: if generation 0 has 3 tasks, and generation 1 is configured with prev_tasks=1 the connector will fail to find any offsets and exit instead of silently using the incorrect configuration.

@jcechace
Copy link
Member

jcechace commented Oct 8, 2024

I'm going over this. However before this gets merged I would very much appreciate at least some basic docs describing how the system works.


import io.debezium.data.Envelope;

public class MongoDbMultiTaskConnectorIT extends AbstractMongoConnectorIT {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing work! The tests look so much more thorough and less hacky :D

@jcechace
Copy link
Member

jcechace commented Oct 9, 2024

@adityanair-stripe I'm slowly crunching through the PR. Would it be possible to schedule a call for the next week to discuss the concept? I know we touched on it sometime last year but a lot of things changed since then -- for one the buffering cursor did not exist.

The way I see this being helpful is only when there is a significant write into a single collection and Debezium is not able to keep up in single task.

If the writes were spread across databases / collections then multitasking based on databases/collections seems to make more sense. If the load is really on single collection, wouldn't it then make more sense to introduce paralel event processing into a single task?

I'm trying to understand where exactly the bottleneck was and whether this is addressing it at the right spot -- as although this is likely a feasible solution, it also adds a lot of complexity to the code.

*
* @author Anthony Noakes
*/
public class MultiTaskOffsetHandler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods here definitively need javadocs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably consider a different name. E.g. "TimeIntervalHandler" as these don't really seem to be offsets (in context of Kafka).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I'll call it MultiTaskWindowHandler

Comment on lines +340 to +361
do {
try (MongoChangeStreamCursor<ChangeStreamDocument<TResult>> cursor = stream.cursor()) {
cursorRef.compareAndSet(null, cursor);
running.set(true);
noMessageIterations = 0;
fetchEvents(cursor);
stream = streamManager.updateStream(stream);
}
catch (InterruptedException e) {
LOGGER.error("Fetcher thread interrupted", e);
Thread.currentThread().interrupt();
throw new DebeziumException("Fetcher thread interrupted", e);
}
catch (Throwable e) {
error.set(e);
LOGGER.error("Fetcher thread has failed", e);
close();
}
finally {
cursorRef.set(null);
}
} while (isRunning());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? Having a loop inside loop (since the fetchEvent call already does this) doesn't seem right. I also wonder whether it can pose a synchronisation issue and accidentally restart the streaming.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outer loop is to move the multi-task window whenever we finish streaming all records from the current window. Within fetchEvents we call shouldUpdateStream on every new event and break out of the inner loop when it returns true (when we get an event outside the current window). Then we move the window, clean up the cursor, update the stream, and open a new cursor before starting to iterate over events again.

cursorRef.set(null);
}
} while (isRunning());
close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this definitively shouldn't be here.

Comment on lines +113 to +114
try (var cursor = multiTaskEnabled ? BufferingChangeStreamCursor.fromIterable(stream, multiTaskOffsetHandler, taskContext, streamingMetrics, clock).start()
: BufferingChangeStreamCursor.fromIterable(stream, taskContext, streamingMetrics, clock).start()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

void ternary operator in try with resources please. It's really difficult to read. Passing null argument and redirecting to overload is perfectly fine

return Module.name();
}

public boolean getMultiTaskEnabled() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isMultiTaskEnabled

@github-actions
Copy link

Hi @adityanair-stripe, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

@adityanair-stripe
Copy link
Author

@jcechace

However before this gets merged I would very much appreciate at least some basic docs describing how the system works.

I've collected some of our docs and combined them into this doc. Let me know if there are any other details about the design that you'd like me to elaborate on.

Would it be possible to schedule a call for the next week to discuss the concept

Sent an email, let me know what time works for you.

The way I see this being helpful is only when there is a significant write into a single collection

Yeah, this is a common occurrence for us - one of the goals of this feature is to solve the 'hot collection' issue described here. Distributing events to tasks based on db/collection doesn't really address the issue.

@github-actions
Copy link

Hi @adityanair-stripe, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

@jcechace
Copy link
Member

this doc

This seems not to be accessible. Can you either make it public or share with me directly?

@github-actions
Copy link

github-actions bot commented Nov 8, 2024

Hi @adityanair-stripe, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

2 similar comments
@github-actions
Copy link

github-actions bot commented Nov 8, 2024

Hi @adityanair-stripe, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

@github-actions
Copy link

github-actions bot commented Nov 8, 2024

Hi @adityanair-stripe, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

@Naros
Copy link
Member

Naros commented Jan 6, 2026

❌ Developer Certificate of Origin (DCO) check failed.

Hi @adityanair-stripe, please sign off all commits with:

git commit -s

If pull request commits are not signed off, the pull request cannot be merged. For more information about why this is required, please see our blog about contribution requirement changes.

@Naros Naros requested review from jpechane and removed request for ani-sha March 10, 2026 17:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants