Skip to content

Conversation

@jxie-1
Copy link
Contributor

@jxie-1 jxie-1 commented Dec 11, 2025

This PR addresses two issues:

When combining bulk shard responses, item ids were used the index to an array. This only works when there's one shard, as a BulkOperation starts with sequential item ids [0, 1, 2...] but gets split into multiple BulkShardRequests.

For instance, in a scenario with 2 shards, a BulkOperation with 10 item requests could get split into two BulkShardRequests with item ids [0, 1, 3, 7, 9] and [2, 4, 5, 6, 8] respectively. After splitting the first BulkShardRequest further for resharding and receiving responses, the current implementation to combine the responses would create an array of length 5 and run into an index out of bounds exception when indexing using item id 7.

The proposed fix is to combine the bulk responses into a map of item id -> item response. Then we can use the original request, mapping each item request to its corresponding item response to create an item response list.

The second issue is that the source shard executes the original bulk shard request instead of the split one. I added a parameter to doPrimaryRequest for the request to execute.

@jxie-1 jxie-1 added >non-issue :Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. Team:Distributed Indexing Meta label for Distributed Indexing team labels Dec 11, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing)

}
});
// Item responses should match the order of the original item requests
BulkItemResponse[] bulkItemResponses = Arrays.stream(originalRequest.items())
Copy link
Contributor

@lkts lkts Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streams allocate and given that we don't win much in readability here (IMO of course) i would go a for loop.

* @param <E> the type of anticipated exception
*/
@FunctionalInterface
public interface CheckedTriConsumer<S, T, U, E extends Exception> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if i like this. IMO when we are at this number of parameters we could benefit from names and introduce a specific interface where it is needed (so in this case SplitRequestExecutor or something). It can still be a @FunctionalInterface.

jxie-1 added a commit to jxie-1/elasticsearch that referenced this pull request Dec 12, 2025
@jxie-1 jxie-1 merged commit c3f07ad into elastic:main Dec 16, 2025
34 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. >non-issue Team:Distributed Indexing Meta label for Distributed Indexing team v9.3.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants