-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19968: Fix tiered storage quota enforcement issues #21108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-19968: Fix tiered storage quota enforcement issues #21108
Conversation
Problem:
1. Race condition and quota leaks: Multiple threads could check quota before
any recorded usage, allowing all to bypass limits simultaneously. Additionally,
in multi-partition fetches, quota was reserved per-partition but could leak if
some partitions failed or were throttled, leading to quota exhaustion over time.
2. Startup race condition: RemoteLogManager initialized with default quotas
(Long.MAX_VALUE = unlimited) and relied on dynamic config updates to apply
correct values, creating a window (100ms-5s) where operations could exceed
configured quotas.
Solution:
1. Atomic quota reservation
- Added RLMQuotaManager.recordAndGetThrottleTimeMs() to atomically record
usage and check quota in a single synchronized operation
- Added quotaReservedBytes field to RemoteStorageFetchInfo to track per-partition
reservations
- Modified ReplicaManager to call recordAndCheckFetchQuota() BEFORE dispatching
remote fetch, ensuring quota is reserved atomically based on adjustedMaxBytes
- If throttled, immediately release the reservation since fetch won't execute
- RemoteLogReader adjusts quota using delta (actual - reserved) after fetch
completes
- On error, releases the full reservation to prevent leaks
2. Eager startup quota initialization
- Ensures quotas are correct before broker starts serving requests
- Added BrokerServer.applyRemoteLogQuotas() to eagerly apply quota configs
immediately after RemoteLogManager creation
|
@abhijeetk88 @kamalcph @satishd , I think the issue is valid. WDYT? |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
|
Thanks for the PR! Went over first pass, have below questions:
Could you add a unit test to cover this case?
Also, cover this case with a unit test to understand the issue.
If the remoteFetchQuotaBytesPerSecond is set to 25 Mbps and there is a message with 30 MB size, will the consumer get stuck? The previous behaviour was to allow the consumption to continue.
How can Kafka differentiate whether the error is from remote storage or there is an error in processing the response? I think this is a good improvement. Usually, we don't expect errors in RLMM or any Kafka components while processing the response. |
|
Thanks so much for the review and reply! I've updated the PR with the tests.
No. The behavior follows Kafka's standard quota handling pattern used in other fetch paths - we allow at least one fetch to proceed even if it exceeds the quota, but subsequent requests will be throttled to bring the average back within limits.
You're right that this is an improvement worth discussing. Currently, the implementation releases the quota reservation on any error during remote fetch processing. In practice, most errors happen after bandwidth usage? We could refine this to only release on specific error types. What's your preference? |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
###Problem:
Race condition and quota leaks: Multiple threads could check quota before any recorded usage, allowing all to bypass limits simultaneously. Additionally, in multi-partition fetches, quota was reserved per-partition but could leak if some partitions failed or were throttled, leading to quota exhaustion over time.
Startup race condition: RemoteLogManager initialized with default quotas (Long.MAX_VALUE = unlimited) and relied on dynamic config updates to apply correct values, creating a window where operations could exceed configured quotas.
###Solution:
Atomic quota reservation
RLMQuotaManager.recordAndGetThrottleTimeMs()to atomically record usage and check quota in a single synchronized operationrecordAndCheckFetchQuota()BEFORE dispatching remote fetch, ensuring quota is reserved atomically based on adjustedMaxBytesEager startup quota initialization
BrokerServer.applyRemoteLogQuotas()to eagerly apply quota configs immediately after RemoteLogManager creation