...
- Clients send requests too quickly - eg. a consumer with
fetch.max.wait.ms
=0 that polls continuously. Fetch byte rate quotas are not sufficient here, either request count quotas or request processing time quotas are required. - DoS attack from clients that overload brokers with continuous authorized or unauthorized requests. Either request count quotas or request processing time quotas that limit all unauthorized requests and all non-broker (client) requests is required.
- Client sends produce requests with compressed messages where decompression takes a long time, blocking the request handler thread. Request processing time quotas are required since neither produce byte rate quotas nor request count quotas will be sufficient to limit the broker resources allocated to users/clients in this case.
- Consumer group starts with 10 instances and then increases to 20 instances. Number of requests may double, so request counts increase, even though the load on the broker doesn't double since the number of partitions per fetch request has halved. Quotas based on request count per second may not be easy to configure in this case.
- Some requests may use more of their quota on the network threads rather than the request handler threads (eg. disk read for fetches happen on the network threads). While quotas of processing time on the request handler thread limit the request rate in many cases above, for a complete request rate quota solution, network thread utilization also needs to be taken into account.
- In clusters that enable both TLS and non-TLS endpoints, the load on the network thread is significantly higher for TLS. Network thread utilization needs to be taken into account to address this.
Goals
This KIP proposes to control request handler (I/O) thread utilization using request processing time quotas that limit the amount of time within each quota window that can be used by users/clients. Only I/O thread utilization will be taken into account in this KIP (Scenarios1- 4).
Limitations
- Network thread utilization (
...
- Scenarios 5, 6) will be addressed separately in a future KIP since that is a lot more complex.
Public Interfaces
Request quotas
- This KIP attempts to avoid unintended denial-of-service scenarios from misconfigured application (eg. zero heartbeat interval) or a client library with a bug can cause broker overload. While it can reduce the load in some DoS scenarios, it does not completely protect against DoS attacks from malicious clients. A DDoS attack with large numbers of connections resulting in a large amount of expensive authentication can still overwhelm the broker..
Public Interfaces
Request quotas
Request quotas will be configured as a fraction of time a client can spend on request handler (I/O) threads within each quota window. Each request handler thread is represented as one request handler unit, giving a total capacity of num.io.threads
units. Each request quota will be the units allocated to a user/client. The Request quotas will be configured as a fraction of time a client can spend on request handler (I/O) threads within each quota window. Each request handler thread is represented as one request handler unit, giving a total capacity of num.io.threads
units. Each request quota will be the units allocated to a user/client. The limits will be applied to the same quota window configuration (quota.window.size.seconds
with 1 second default) as existing produce/fetch quotas. This approach keeps the code consistent with the existing quota implementation, while making it easy for administrators to allocate a slice of each quota window to users/clients to control request handler thread utilization on the broker. If a client/user exceeds the request processing time limit, responses will be delayed by an amount that brings the rate within the limit. The maximum delay applied will be the quota window size.
...
- StopReplica
- ControlledShutdown
- LeaderAndIsr
- UpdateMetadata
To ensure that these exempt requests cannot be used by clients to launch a DoS attack, these requests will be throttled on quota violation if ClusterAction
authorization fails. SaslHandshake request will not be throttled when used for authentication, but will be throttled on quota violation if used at any other time.
All other requests may be throttled if All other requests may be throttled if the rate exceeds the configured quota. All requests that may be throttled will have an additional field request_throttle_time_ms
to indicate to the client that the request was throttled.The versions of these requests will be incremented. The existing field throttle_time_ms
in produce/fetch responses will be retained renamed to bandwidth_throttle_time_ms
to return byte-rate quota throttling times, while the new field returns request throttling time based on request processing time.
...
Two new metrics and corresponding sensors will be added to the broker for tracking request-time
and throttle-time
of each quota entity for the new quota type IOThread. These will be handled similar to the metrics and sensors for Produce/Fetch. A delay queue sensor with queue-size
for the new quota type IOThread
will also be added similar to the delay queue sensor for Produce/Fetch.
Clients will expose average and maximum request throttle time as JMX metrics similar to the current produce/fetch throttle time metrics.
All the metrics All the metrics and sensors for request time throttling will be of similar format to the existing produce/fetch metrics and sensors for consistency, but with new group/name indicating the new quota type IOThread, keeping these separate from existing metrics/sensors.
Tools
Clients will expose average and maximum request throttle time as JMX metrics similar to the current produce/fetch throttle time metrics. Two new metrics request-throttle-time-max
and request-throttle-time-min
will reflect request processing time based throttling while the existing metrics will continue to reflect bandwidth throttling.
Tools
kafka-configs.sh
will be extended to support request quotas. A new quota property will be kafka-configs.sh
will be extended to support request quotas. A new quota property will be added, which can be applied to <client-id>, <user> or <user, client-id>:
io_.thread_.units
: The fractional units of time per quota window (out of a total of num.io.threads
units) for requests from the user or client, above which the request may be throttled.
...
bin/kafka-configs --zookeeper localhost:2181 --alter --add-config 'io_.thread_units
=0.1
' --entity-name user1
--entity-type users
...
bin/kafka-configs --zookeeper localhost:2181 --alter --add-config 'io_thread_units
=2.0
' --entity-type users
Proposed Changes
Quota entity
Request quotas will be supported for <client-id>
, <user>
and <user, client-id>
similar to the existing produce/fetch byte rate quotas. In addition to produce and fetch rates, an additional quota property will be added for throttling based on I/O thread utilization. As with produce/fetch quotas, request quotas will be per-broker. Defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> levels.
Request quotas
Quotas for requests will be configured as a fraction of time within a quota window that a client is allowed to use across all of the I/O threads. The total I/O thread capacity of num.io.threads
units can be distributed between clients/users. For example, with the default configuration of 1 second quota window size, if user alice has a I/O thread quota of 0.1, the total time all clients of alice can spend in the request handler threads in any one second window is 10 milliseconds. When this time is exceeded, a delay is added to the response to bring alice’s usage within the configured quota. The maximum delay added to any response will be the window size. The calculation of delay will be the same as the current calculation used for throttling produce/fetch requests:
...
Protocol changes
All client requests which are not exempt from request throttling will have a new field containing the time that the request was throttled for.
Code Block |
---|
language | js |
---|
title | Produce Response |
---|
collapse | true |
---|
|
Produce Response (Version: 3) => request_throttle_time_ms bandwidth_throttle_time_ms [responses]
request_throttle_time_ms => INT32 (new)
bandwidth_throttle_time_ms => INT32 (moved and renamed)
responses => topic [partition_responses]
topic => STRING
partition_responses => partition error_code base_offset log_append_time
partition => INT32
error_code => INT16
base_offset => INT64
log_append_time => INT64 |
Code Block |
---|
language | js |
---|
title | Fetch Response |
---|
collapse | true |
---|
|
Fetch Response (Version: 4) => request_throttle_time_ms bandwidth_throttle_time_ms [responses]
request_throttle_time_ms => INT32 (new)
bandwidth_throttle_time_ms => INT32 (renamed)
responses => topic [partition_responses]
topic => STRING
partition_responses => partition_header record_set
partition_header => partition error_code high_watermark
partition => INT32
error_code => INT16
high_watermark => INT64
record_set => RECORDS |
Code Block |
---|
language | js |
---|
title | Offsets Response |
---|
collapse | true |
---|
|
Offsets Response (Version: 2) => request_throttle_time_ms [responses]
request_throttle_time_ms => INT32 (new)
responses => topic [partition_responses]
topic => STRING
partition_responses => partition error_code timestamp offset
partition => INT32
error_code => INT16
timestamp => INT64
offset => INT64 |
Code Block |
---|
language | js |
---|
title | Metadata Response |
---|
collapse | true |
---|
|
Metadata Response (Version: 3) => request_throttle_time_ms [brokers] cluster_id controller_id [topic_metadata]
request_throttle_time_ms => INT32 (new)
brokers => node_id host port rack
node_id => INT32
host => STRING
port => INT32
rack => NULLABLE_STRING
cluster_id => NULLABLE_STRING
controller_id => INT32
topic_metadata => topic_error_code topic is_internal [partition_metadata]
topic_error_code => INT16
topic => STRING
is_internal => BOOLEAN
partition_metadata => partition_error_code partition_id leader [replicas] [isr]
partition_error_code => INT16
partition_id => INT32
leader => INT32
replicas => INT32
isr => INT32 |
Code Block |
---|
language | js |
---|
title | OffsetCommit Response |
---|
collapse | true |
---|
|
OffsetCommit Response (Version: 3) => request_throttle_time_ms [responses]
request_throttle_time_ms => INT32 (new)
responses => topic [partition_responses]
topic => STRING
partition_responses => partition error_code
partition => INT32
error_code => INT16 |
Code Block |
---|
language | js |
---|
title | OffsetFetchResponse |
---|
collapse | true |
---|
|
OffsetFetch Response (Version: 3) => request_throttle_time_ms [responses] error_code
request_throttle_time_ms => INT32 (new)
responses => topic [partition_responses]
topic => STRING
partition_responses => partition offset metadata error_code
partition => INT32
offset => INT64
metadata => NULLABLE_STRING
error_code => INT16
error_code => INT16 |
Code Block |
---|
language | js |
---|
title | GroupCoordinator Response |
---|
collapse | true |
---|
|
GroupCoordinator Response (Version: 1) => request_throttle_time_ms error_code coordinator
request_throttle_time_ms => INT32 (new)
error_code => INT16
coordinator => node_id host port
node_id => INT32
host => STRING
port => INT32 |
Code Block |
---|
language | js |
---|
title | JoinGroup Response |
---|
collapse | true |
---|
|
JoinGroup Response (Version: 2) => request_throttle_time_ms error_code generation_id group_protocol leader_id member_id [members]
request_throttle_time_ms => INT32 (new)
error_code => INT16
generation_id => INT32
group_protocol => STRING
leader_id => STRING
member_id => STRING
members => member_id member_metadata
member_id => STRING
member_metadata => BYTES |
Code Block |
---|
language | js |
---|
title | Heartbeat Response |
---|
collapse | true |
---|
|
Heartbeat Response (Version: 1) => request_throttle_time_ms error_code
request_throttle_time_ms => INT32 (new)
error_code => INT16 |
Code Block |
---|
language | js |
---|
title | LeaveGroup Response |
---|
collapse | true |
---|
|
LeaveGroup Response (Version: 1) => request_throttle_time_ms error_code
request_throttle_time_ms => INT32 (new)
error_code => INT16 |
Code Block |
---|
language | js |
---|
title | SyncGroup Response |
---|
collapse | true |
---|
|
SyncGroup Response (Version: 1) => request_throttle_time_ms error_code member_assignment
request_throttle_time_ms => INT32 (new)
error_code => INT16
member_assignment => BYTES |
Code Block |
---|
language | js |
---|
title | DescribeGroups Response |
---|
collapse | true |
---|
|
DescribeGroups Response (Version: 1) => request_throttle_time_ms [groups]
request_throttle_time_ms => INT32 (new)
groups => error_code group_id state protocol_type protocol [members]
error_code => INT16
group_id => STRING
state => STRING
protocol_type => STRING
protocol => STRING
members => member_id client_id client_host member_metadata member_assignment
member_id => STRING
client_id => STRING
client_host => STRING
member_metadata => BYTES
member_assignment => BYTES |
Code Block |
---|
language | js |
---|
title | ListGroups Response |
---|
collapse | true |
---|
|
ListGroups Response (Version: 1) => request_throttle_time_ms error_code [groups]
request_throttle_time_ms => INT32 (new)
error_code => INT16
groups => group_id protocol_type
group_id => STRING
protocol_type => STRING |
Code Block |
---|
language | js |
---|
title | ApiVersions Response |
---|
collapse | true |
---|
|
ApiVersions Response (Version: 1) => request_throttle_time_ms error_code [api_versions]
request_throttle_time_ms => INT32 (new)
error_code => INT16
api_versions => api_key min_version max_version
api_key => INT16
min_version => INT16
max_version => INT16
|
Code Block |
---|
language | js |
---|
title | CreateTopics Response |
---|
collapse | true |
---|
|
CreateTopics Response (Version: 2) => request_throttle_time_ms [topic_errors]
request_throttle_time_ms => INT32 (new)
topic_errors => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Code Block |
---|
language | js |
---|
title | DeleteTopics Response |
---|
collapse | true |
---|
|
DeleteTopics Response (Version: 1) => request_throttle_time_ms [topic_error_codes]
request_throttle_time_ms => INT32 (new)
topic_error_codes => topic error_code
topic => STRING
error_code => INT16 |
Proposed Changes
Quota entity
Request quotas will be supported for <client-id>
, <user>
and <user, client-id>
similar to the existing produce/fetch byte rate quotas. In addition to produce and fetch rates, an additional quota property will be added for throttling based on I/O thread utilization. As with produce/fetch quotas, request quotas will be per-broker. Defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> levels.
Request quotas
Quotas for requests will be configured as a fraction of time within a quota window that a client is allowed to use across all of the I/O threads. The total I/O thread capacity of num.io.threads
units can be distributed between clients/users. For example, with the default configuration of 1 second quota window size, if user alice has a I/O thread quota of 0.1, the total time all clients of alice can spend in the request handler threads in any one second window is 10 milliseconds. When this time is exceeded, a delay is added to the response to bring alice’s usage within the configured quota. The maximum delay added to any response will be the window size. The calculation of delay will be the same as the current calculation used for throttling produce/fetch requests:
- If O is the observed usage and T is the target usage over a window of W, to bring O down to T, we need to add a delay of X to W such that:
O * W / (W + X) = T
. - Solving for X, we get
X = (O - T)/T * W
. - The response will be throttled by
min((X, W)
The maximum throttle time for any single request will be the quota window size (one second by default). This ensures that timing-sensitive requests like heartbeats are not delayed for extended durations. For example, if a user has a quota of 0.001 and a stop-the-world GC pause takes 100ms during the processing of the user's request, we don't want all the requests from the user to be delayed by 100 seconds. By limiting the maximum delay, we reduce the impact of GC pauses and single large requests. To exploit this limit to bypass quota limits, clients would need to generate requests that take significantly longer than the quota limit. If R is the amount of time taken process one request, the maximum amount of time a user/client can use in the long term per quota window is max(quota, num.io.threads * R)
. In practice, quotas are expected to be much larger than the time taken to process individual requests and hence this limit should be sufficient. Byte rate quotas will also additionally help to increase throttling in the case where large produce/fetch requests result in larger per-request time.
...
Sample configuration in Zookeeper
...