...
All other requests may be throttled if the rate exceeds the configured quota. Produce/Fetch requests will return the total throttling time in the response. All other requests that may be throttled will have an additional field request_throttle_time_ms
to indicate to the client that the request was throttled. The versions of these requests will be incremented. The existing field throttle_time_ms
in produce/fetch responses will be renamed to bandwidth_throttle_time_ms
to return byte-rate quota throttling times, while the new field returns request throttling time based on request processing time.
Fetch and produce requests will continue to be throttled based on byte rates and may also be throttled based on request handler thread utilization. Fetch requests used for replication will not be throttled based on request times since it is possible to configure replica.fetch.wait.max.ms
and use the existing replication byte rate quotas to limit replication rate.
...
Clients will expose average and maximum request throttle time as JMX metrics similar to the current produce/fetch throttle time metrics. Two new metrics request-throttle-time-These metrics will reflect the total throttle time for produce and fetch requests including both byte-rate throttling and processing time throttling. Two new metrics request-throttle-time-max
and request-throttle-time-min
will be added to reflect total request processing time based throttling while the existing metrics will continue to reflect bandwidth throttlingfor all requests.
Tools
kafka-configs.sh
will be extended to support request quotas. A new quota property will be added, which can be applied to <client-id>, <user> or <user, client-id>:
...
All client requests which are not exempt from request throttling will have a new field containing the time in milliseconds that the request was throttled for.
Code Block |
---|
language | js |
---|
title | Produce Response |
---|
collapse | true |
---|
|
Produce Response (Version: 3) => request_throttle_time_ms bandwidth_throttle_time_ms [responses]
request_throttle_time_ms => INT32 (new)
bandwidth_throttle_time_ms => INT32 (moved and renamed)
responses => topic [partition_responses]
topic => STRING
partition_responses => partition error_code base_offset log_append_time
partition => INT32
error_code => INT16
base_offset => INT64
log_append_time => INT64 |
...
language | js |
---|
title | Fetch Response |
---|
collapse | true |
---|
...
in milliseconds that the request was throttled for.
Code Block |
---|
language | js |
---|
title | Offsets Response |
---|
collapse | true |
---|
|
Offsets Response (Version: 2) => request_throttle_time_ms [responses]
request_throttle_time_ms => INT32 (new)
responses => topic [partition_responses]
topic => STRING
partition_responses => partition error_code timestamp offset
partition => INT32
error_code => INT16
timestamp => INT64
offset => INT64 |
Code Block |
---|
language | js |
---|
title | Metadata Response |
---|
collapse | true |
---|
|
Metadata Response (Version: 3) => request_throttle_time_ms [brokers] cluster_id controller_id [topic_metadata]
request_throttle_time_ms => INT32 (new)
brokers => node_id host port rack
node_id => INT32
host => STRING
port => INT32
rack => NULLABLE_STRING
cluster_id => NULLABLE_STRING
controller_id => INT32
topic_metadata => topic_error_code topic is_internal [partition_metadata]
topic_error_code => INT16
topic => STRING
is_internal => BOOLEAN
partition_metadata => partition_error_code partition_id leader [replicas] [isr]
partition_error_code => INT16
partition_id => INT32
leader => INT32
replicas => INT32
isr => INT32 |
Code Block |
---|
language | js |
---|
title | OffsetCommit Response |
---|
collapse | true |
---|
|
OffsetCommit Response (Version: 3) => request_throttle_time_ms [responses]
request_throttle_time_ms => INT32 (new)
responses => topic [partition_responses]
topic => STRING
partition_responses => partition error_code
partition => INT32
error_code => INT16 |
Code Block |
---|
language | js |
---|
title | OffsetFetchResponse |
---|
collapse | true |
---|
|
OffsetFetch Response (Version: 3) => request_throttle_time_ms [responses] error_code
request_throttle_time_ms => INT32 (new)
responses => topic [partition_responses]
topic => STRING
partition_responses => partition offset metadata error_code
partition => INT32
offset => INT64
metadata => NULLABLE_STRING
error_code => INT16
error_code => INT16 |
Code Block |
---|
language | js |
---|
title | GroupCoordinator Response |
---|
collapse | true |
---|
|
GroupCoordinator Response (Version: 1) => request_throttle_time_ms error_code coordinator
request_throttle_time_ms => INT32 (new)
error_code => INT16
coordinator => node_id host port
node_id => INT32
host => STRING
port => INT32 |
Code Block |
---|
language | js |
---|
title | JoinGroup Response |
---|
collapse | true |
---|
|
JoinGroup Response (Version: 2) => request_throttle_time_ms error_code generation_id group_protocol leader_id member_id [members]
request_throttle_time_ms => INT32 (new)
error_code => INT16
generation_id => INT32
group_protocol => STRING
leader_id => STRING
member_id => STRING
members => member_id member_metadata
member_id => STRING
member_metadata => BYTES |
Code Block |
---|
language | js |
---|
title | Heartbeat Response |
---|
collapse | true |
---|
|
Heartbeat Response (Version: 1) => request_throttle_time_ms error_code
request_throttle_time_ms => INT32 (new)
error_code => INT16 |
Code Block |
---|
language | js |
---|
title | LeaveGroup Response |
---|
collapse | true |
---|
|
LeaveGroup Response (Version: 1) => request_throttle_time_ms error_code
request_throttle_time_ms => INT32 (new)
error_code => INT16 |
Code Block |
---|
language | js |
---|
title | SyncGroup Response |
---|
collapse | true |
---|
|
SyncGroup Response (Version: 1) => request_throttle_time_ms error_code member_assignment
request_throttle_time_ms => INT32 (new)
error_code => INT16
member_assignment => BYTES |
Code Block |
---|
language | js |
---|
title | DescribeGroups Response |
---|
collapse | true |
---|
|
DescribeGroups Response (Version: 1) => request_throttle_time_ms [groups]
request_throttle_time_ms => INT32 (new)
groups => error_code group_id state protocol_type protocol [members]
error_code => INT16
group_id => STRING
state => STRING
protocol_type => STRING
protocol => STRING
members => member_id client_id client_host member_metadata member_assignment
member_id => STRING
client_id => STRING
client_host => STRING
member_metadata => BYTES
member_assignment => BYTES |
Code Block |
---|
language | js |
---|
title | ListGroups Response |
---|
collapse | true |
---|
|
ListGroups Response (Version: 1) => request_throttle_time_ms error_code [groups]
request_throttle_time_ms => INT32 (new)
error_code => INT16
groups => group_id protocol_type
group_id => STRING
protocol_type => STRING |
Code Block |
---|
language | js |
---|
title | ApiVersions Response |
---|
collapse | true |
---|
|
ApiVersions Response (Version: 1) => request_throttle_time_ms error_code [api_versions]
request_throttle_time_ms => INT32 (new)
error_code => INT16
api_versions => api_key min_version max_version
api_key => INT16
min_version => INT16
max_version => INT16
|
Code Block |
---|
language | js |
---|
title | CreateTopics Response |
---|
collapse | true |
---|
|
CreateTopics Response (Version: 2) => request_throttle_time_ms [topic_errors]
request_throttle_time_ms => INT32 (new)
topic_errors => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Code Block |
---|
language | js |
---|
title | DeleteTopics Response |
---|
collapse | true |
---|
|
DeleteTopics Response (Version: 1) => request_throttle_time_ms [topic_error_codes]
request_throttle_time_ms => INT32 (new)
topic_error_codes => topic error_code
topic => STRING
error_code => INT16 |
...
On the client side, a new sensor name request-throttle-time
will be added to track total request throttle time returned in all responses. This is in addition to the similar sensor used to track produce/fetch throttle times, which will continue to be supported. Existing produce/fetch throttle times will include total throttling time for both bandwidth and utilization for produce/fetch requests. Maximum and average throttle times for request time based throttling will be exposed as metrics in addition to throttle time metrics for produce/fetch byte rates.
Future Work
...
- Quota limits for request processing time can be configured dynamically if required. Older versions of brokers will ignore request time quotas.
- If request quotas are configured on the broker, throttle time will be returned in the response to clients only if the client supports the new version of requests being throttled.
- If request quotas are configured, client produce/fetch throttle-time metrics will reflect total throttle time including bandwidth and utilization based throttling of these requests. The throttle time returned in produce/fetch responses will include this total throttle time.
Test Plan
One set of integration and system tests will be added for request throttling. Since most of the code can be reused from existing producer/consumer quota implementation and since quota tests take a significant amount of time to run, one test for testing the full path should be sufficient.
...