Skip to content

Conversation

@nandini12396
Copy link
Contributor

@nandini12396 nandini12396 commented Dec 8, 2025

###Problem:

  1. Race condition and quota leaks: Multiple threads could check quota before any recorded usage, allowing all to bypass limits simultaneously. Additionally, in multi-partition fetches, quota was reserved per-partition but could leak if some partitions failed or were throttled, leading to quota exhaustion over time.

  2. Startup race condition: RemoteLogManager initialized with default quotas (Long.MAX_VALUE = unlimited) and relied on dynamic config updates to apply correct values, creating a window where operations could exceed configured quotas.

###Solution:

  1. Atomic quota reservation

    • Added RLMQuotaManager.recordAndGetThrottleTimeMs() to atomically record usage and check quota in a single synchronized operation
    • Added quotaReservedBytes field to RemoteStorageFetchInfo to track per-partition reservations
    • Modified ReplicaManager to call recordAndCheckFetchQuota() BEFORE dispatching remote fetch, ensuring quota is reserved atomically based on adjustedMaxBytes
    • If throttled, immediately release the reservation since fetch won't execute
    • RemoteLogReader adjusts quota using delta (actual - reserved) after fetch completes
    • On error, releases the full reservation to prevent leaks
  2. Eager startup quota initialization

    • Ensures quotas are correct before broker starts serving requests
    • Added BrokerServer.applyRemoteLogQuotas() to eagerly apply quota configs immediately after RemoteLogManager creation

Problem:
1. Race condition and quota leaks: Multiple threads could check quota before
   any recorded usage, allowing all to bypass limits simultaneously. Additionally,
   in multi-partition fetches, quota was reserved per-partition but could leak if
   some partitions failed or were throttled, leading to quota exhaustion over time.

2. Startup race condition: RemoteLogManager initialized with default quotas
   (Long.MAX_VALUE = unlimited) and relied on dynamic config updates to apply
   correct values, creating a window (100ms-5s) where operations could exceed
   configured quotas.

Solution:
1. Atomic quota reservation
   - Added RLMQuotaManager.recordAndGetThrottleTimeMs() to atomically record
     usage and check quota in a single synchronized operation
   - Added quotaReservedBytes field to RemoteStorageFetchInfo to track per-partition
     reservations
   - Modified ReplicaManager to call recordAndCheckFetchQuota() BEFORE dispatching
     remote fetch, ensuring quota is reserved atomically based on adjustedMaxBytes
   - If throttled, immediately release the reservation since fetch won't execute
   - RemoteLogReader adjusts quota using delta (actual - reserved) after fetch
     completes
   - On error, releases the full reservation to prevent leaks

2. Eager startup quota initialization
   - Ensures quotas are correct before broker starts serving requests
   - Added BrokerServer.applyRemoteLogQuotas() to eagerly apply quota configs
       immediately after RemoteLogManager creation
@github-actions github-actions bot added triage PRs from the community core Kafka Broker storage Pull requests that target the storage module tiered-storage Related to the Tiered Storage feature labels Dec 8, 2025
@showuon
Copy link
Member

showuon commented Dec 10, 2025

@abhijeetk88 @kamalcph @satishd , I think the issue is valid. WDYT?

@github-actions
Copy link

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

@kamalcph
Copy link
Contributor

Thanks for the PR! Went over first pass, have below questions:

Startup race condition: RemoteLogManager initialized with default quotas (Long.MAX_VALUE = unlimited) and relied on dynamic config updates to apply correct values, creating a window where operations could exceed configured quotas.

Could you add a unit test to cover this case?

Multiple threads could check quota before any recorded usage, allowing all to bypass limits simultaneously.

Also, cover this case with a unit test to understand the issue.

Modified ReplicaManager to call recordAndCheckFetchQuota() BEFORE dispatching remote fetch,

If the remoteFetchQuotaBytesPerSecond is set to 25 Mbps and there is a message with 30 MB size, will the consumer get stuck? The previous behaviour was to allow the consumption to continue.

On error, releases the full reservation to prevent leaks

How can Kafka differentiate whether the error is from remote storage or there is an error in processing the response? I think this is a good improvement. Usually, we don't expect errors in RLMM or any Kafka components while processing the response.

@nandini12396
Copy link
Contributor Author

nandini12396 commented Dec 16, 2025

Thanks so much for the review and reply! I've updated the PR with the tests.

If the remoteFetchQuotaBytesPerSecond is set to 25 Mbps and there is a message with 30 MB size, will the consumer get stuck? The previous behaviour was to allow the consumption to continue.

No. The behavior follows Kafka's standard quota handling pattern used in other fetch paths - we allow at least one fetch to proceed even if it exceeds the quota, but subsequent requests will be throttled to bring the average back within limits.

How can Kafka differentiate whether the error is from remote storage or there is an error in processing the response?

You're right that this is an improvement worth discussing. Currently, the implementation releases the quota reservation on any error during remote fetch processing. In practice, most errors happen after bandwidth usage? We could refine this to only release on specific error types. What's your preference?

@github-actions
Copy link

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker needs-attention storage Pull requests that target the storage module tiered-storage Related to the Tiered Storage feature triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants