Open
Description
I couldn't find any documentation about this feature, so I hope someone can explain to me why the KafkaConsumerState
class that handles the offset committing increments the sent offsets from TopicPartitionOffsetMetadata
by one?
Here is the code:
for (TopicPartitionOffsetMetadata t : offsetCommitRequest.getOffsets()) {
if (t.getMetadata() == null) {
offsetMap.put(
new TopicPartition(t.getTopic(), t.getPartition()),
new OffsetAndMetadata(t.getOffset() + 1));
} else {
offsetMap.put(
new TopicPartition(t.getTopic(), t.getPartition()),
new OffsetAndMetadata(t.getOffset() + 1, t.getMetadata()));
}
}
With this behaviour, if you want to commit the offset for a partition you need to send the wanted offset - 1. For example, if I want to set the offset to 0 i need to send this body:
{
"offsets": [
{
"topic": "test-topic",
"partition": 0,
"offset": -1
}
]
}
Metadata
Assignees
Labels
No labels