Table of Contents |
---|
Status
Current state: "Under DiscussionAccepted"
Discussion thread: here
JIRA: KAFKA-4195
...
This KIP proposes to control request handler (I/O) thread and network thread utilization using request processing time quotas that limit the amount of time within each quota window that can be used by users/clients. Only I/O thread utilization will be taken into account in this KIP (Scenarios1- 4).
Limitations
...
This KIP attempts to avoid unintended denial-of-service scenarios from misconfigured application (eg. zero heartbeat interval) or a client library with a bug can cause broker overload. While it can reduce the load in some DoS scenarios, it does not completely protect against DoS attacks from malicious clients. A DDoS attack with large numbers of connections resulting in a large amount of expensive authentication or CPU-intensive requests can still overwhelm the broker..
Public Interfaces
Request quotas
Request quotas will be configured as a fraction the percentage of time a client can spend on request handler (I/O) threads and network threads within each quota window. Each request handler thread is represented as one request handler unit, giving A quota of n%
represents n%
of one thread, so the quota is out of a total capacity of ((num.io.threads + num.network.threads
units) * 100
)%. Each request quota will be the units allocated to total percentage across all request handler and network threads that a user/client . The limits will be applied to the same quota window configuration (quota.window.size.seconds
with 1 second default) as existing produce/fetch quotas. This approach keeps the code consistent with the existing quota implementation, while making it easy for administrators to allocate a slice of each quota window to users/clients to control request handler thread utilization on the broker. If a client/user exceeds the request processing time limit, responses will be delayed by an amount that brings the rate within the limit. The maximum delay applied will be the quota window size.
Default quotas
may use in a quota window before being throttled. Since the number of threads allocated for I/O and network threads are typically based on the number of cores available on the broker host, request quotas represent the total percentage of CPU that may be used by the user/client. In future, if quotas are implemented for utilization of other types of threads, the same quota configuration can be used to limit the total utilization across all the threads monitored for quotas.
The limits will be applied to the same quota window configuration (quota.window.size.seconds
with 1 second default) as existing produce/fetch quotas. This approach keeps the code consistent with the existing quota implementation, while making it easy for administrators to allocate a slice of each quota window to users/clients to control CPU utilization on the broker. If a client/user exceeds the request processing time limit, responses will be delayed by an amount that brings the rate within the limit. The maximum delay applied will be the quota window size.
Default quotas
By default, clients will not be throttled based on request processing time, but defaults can be configured using the dynamic default properties at By default, clients will not be throttled based on I/O thread utilization, but defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> levels. Defaults as well as overrides are stored as dynamic configuration properties in Zookeeper alongside the other rate limits.
Requests
...
exempt from throttling
Requests that update cluster state will be throttled only if authorization for ClusterAction
fails. These are infrequent requests for cluster management, typically not used by clients:
...
To ensure that these exempt requests cannot be used by clients to launch a DoS attack, these requests will be throttled on quota violation if ClusterAction
authorization fails. SaslHandshake
request will not be throttled when used for authentication, but will be throttled on quota violation if used at any other time.
All other requests may be throttled if the rate exceeds the configured quota. Produce/Fetch requests will return the total throttling time reflecting both bandwidth and utilization based throttling in the response. All other requests that may be throttled will have an additional field request_throttle_time_ms
to indicate to the client that the request was throttled. The versions of these requests will be incremented. The existing field throttle_time_ms
in produce/fetch responses will be renamed to bandwidth_throttle_time_ms
to return byte-rate quota throttling times, while the new field returns request throttling time based on request processing time.
Fetch and produce Fetch and produce requests will continue to be throttled based on byte rates and may also be throttled based on request handler thread utilization. Fetch requests used for replication will not be throttled based on request times since it is possible to configure replica.fetch.wait.max.ms
and use the existing replication byte rate quotas to limit replication rate.
...
Two new metrics and corresponding sensors will be added to the broker for tracking request-time
and throttle-time
of each quota entity for the new quota type IOThreadRequest. These will be handled similar to the metrics and sensors for Produce/Fetch. A delay queue sensor with queue-size
for the new quota type IOThread
Request will also be added similar to the delay queue sensor for Produce/Fetch. All the metrics and sensors for request time throttling will be of similar format to the existing produce/fetch metrics and sensors for consistency, but with new group/name indicating the new quota type IOThreadRequest, keeping these separate from existing metrics/sensors.
An additional metric exempt-request-time
will also be added for the quota type Request
to track the time spent processing requests which are exempt from throttling. This will capture the total time for requests from all users/clients that are exempt from throttling so that administrators can view the CPU utilization of exempt requests as well.
Producers and consumers currently expose average and maximum producer/fetch request throttle time as JMX metrics. These metrics will be updated to reflect total throttle time for the producer or consumer including byte-rate throttling and request time throttling for all requests of the producer/consumer. Similar metrics may be added for the admin client in futureClients will expose average and maximum request throttle time as JMX metrics similar to the current produce/fetch throttle time metrics. Two new metrics request-throttle-time-max
and request-throttle-time-min
will reflect request processing time based throttling while the existing metrics will continue to reflect bandwidth throttling.
Tools
kafka-configs.sh
will be extended to support request quotas. A new quota property will be added, which can be applied to <client-id>, <user> or <user, client-id>:
io.thread.units
request_percentage
: The fractional units of time percentage per quota window (out of a total of(num.io.threads
units+ num.network.threads) * 100%
) for requests from the user or client, above which the request may be throttled.
...
bin/kafka-configs --zookeeper localhost:2181 --alter --add-config '
' io.threadrequest_unitspercentage
=0.150--entity-name user1
--entity-type users
...
bin/kafka-configs --zookeeper localhost:2181 --alter --add-config '
io.thread.units
=2.0
' request_percentage
=200--entity-type users
Protocol changes
All client requests which are not exempt from request throttling will have a new field containing the time in milliseconds that the request was throttled for.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
ProduceOffsets Response (Version: 32) => request_throttle_time_ms bandwidth_throttle_time_ms [responses] request_throttle_time_ms => INT32 (new) bandwidth_throttle_time_ms => INT32 (moved and renamed) responses => topic [partition_responses] topic => STRING partition_responses => partition error_code base_timestamp offset log_append_time partition => INT32 error_code => INT16 base_offsettimestamp => INT64 offset log_append_time => INT64 |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
FetchMetadata Response (Version: 43) => request_throttle_time_ms [brokers] bandwidthcluster_throttle_time_msid controller_id [responsestopic_metadata] request_ throttle_time_ms => INT32 (new) bandwidth_throttle_time_ms brokers => INT32 (renamed) responses node_id host port rack node_id => topic [partition_responses] INT32 topichost => STRING partition_responsesport => partition_header record_set INT32 rack => NULLABLE_STRING partitioncluster_headerid => partition error_code high_watermark NULLABLE_STRING controller_id => INT32 partitiontopic_metadata => INT32 topic_error_code topic is_internal [partition_metadata] topic_error_code => INT16 topic => STRING high is_watermarkinternal => INT64BOOLEAN recordpartition_setmetadata => RECORDS | ||||||
Code Block | ||||||
| ||||||
Offsets Response (Version: 2) => request_throttle_time_ms [responses] request_throttle_time_ms => INT32 (new) responses => topic [partition_responses] topic => STRING partition_responses => partition error_code timestamp offset partition_error_code partition_id leader [replicas] [isr] partition_error_code => INT16 partition_id => INT32 partitionleader => INT32 error_codereplicas => INT16INT32 timestampisr => INT64 offset => INT64INT32 |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
MetadataOffsetCommit Response (Version: 3) => request_throttle_time_ms [brokersresponses] cluster_id controller_id [topic_metadata] request_throttle_time_ms throttle_time_ms => INT32 (new) brokersresponses => node_id host port rack topic [partition_responses] node_idtopic => INT32STRING hostpartition_responses => STRING partition error_code port => INT32 partition rack => NULLABLE_STRINGINT32 cluster_id => NULLABLE_STRING controllererror_idcode => INT32 topic_metadataINT16 |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
OffsetFetch Response (Version: 3) => topicthrottle_errortime_code topic is_internal [partition_metadata] ms [responses] error_code topicthrottle_errortime_codems => INT16INT32 (new) responses => topic => STRING[partition_responses] is_internaltopic => BOOLEANSTRING partition_metadataresponses => partition_ offset metadata error_code partition_id leader [replicas] [isr] partition_error_code => INT16INT32 partition_idoffset => INT32INT64 leadermetadata => INT32NULLABLE_STRING replicaserror_code => INT32INT16 isr error_code => INT32INT16 |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
OffsetCommitGroupCoordinator Response (Version: 31) => request_throttle_time_ms [responses]error_code coordinator request_throttle_time_ms => INT32 (new) responseserror_code => topic [partition_responses] topic => STRINGINT16 coordinator => node_id host port partitionnode_responsesid => partition error_code INT32 host partition => INT32STRING error_codeport => INT16INT32 |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
OffsetFetchJoinGroup Response (Version: 32) => request_throttle_time_ms [responses] error_code generation_id requestgroup_throttle_time_ms protocol leader_id member_id [members] throttle_time_ms => INT32 (new) responses error_code => topic [partition_responses] INT16 generation_id => INT32 group_protocol topic=> STRING leader_id => STRING partitionmember_responsesid => partition offset metadata error_codeSTRING members => member_id member_metadata member_id partition => INT32STRING offsetmember_metadata => INT64 metadata => NULLABLE_STRING error_code => INT16 errorBYTES |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
Heartbeat Response (Version: 1) => throttle_time_ms error_code
throttle_time_ms => INT32 (new)
error_code => INT16 |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
GroupCoordinatorLeaveGroup Response (Version: 1) => request_throttle_time_ms error_code coordinator request_throttle_time_ms => INT32 (new) error error_code => INT16 coordinator |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
SyncGroup Response (Version: 1) => nodethrottle_time_idms host porterror_code member_assignment node_idthrottle_time_ms => INT32 (new) hosterror_code => STRINGINT16 member_assignment port => INT32BYTES |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
JoinGroupDescribeGroups Response (Version: 21) => request_throttle_time_ms error_code generation_id group_protocol leader_id member_id [members][groups] request_throttle_time_ms => INT32 (new) error groups => error_code => INT16group_id state protocol_type protocol [members] generation error_idcode => INT32INT16 group_protocolid => STRING leader_id state => STRING member protocol_idtype => STRING members protocol => STRING members => member_id client_id client_host member_metadata member_assignment member_id => STRING client_id => STRING client_host => STRING member_metadata => BYTES member_assignment => BYTES |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
HeartbeatListGroups Response (Version: 1) => request_throttle_time_ms error_code [groups] request_throttle_time_ms => INT32 (new) error error_code => INT16 groups => group_id protocol_type group_id => STRING protocol_type => STRING |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
LeaveGroupApiVersions Response (Version: 1) => request_throttle_time_ms error_code [api_versions] request_throttle_time_mserror_code => INT32 (new) error_codeINT16 api_versions => INT16 | ||||||
Code Block | ||||||
| ||||||
SyncGroup Response (Version: 1) => request_throttle_time_ms error_code member_assignment request_throttle_time_ms => INT32 (new) error_code api_key min_version max_version api_key => INT16 min_version => INT16 max_version => INT16 memberthrottle_time_assignmentms => BYTES INT32 (new) |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
DescribeGroupsCreateTopics Response (Version: 12) => request_throttle_time_ms [groupstopic_errors] request_throttle_time_ms => INT32 (new) groupstopic_errors => topic error_code group_id state protocol_type protocol [members] error_message topic => STRING error_code => INT16 grouperror_idmessage => STRING stateNULLABLE_STRING |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
DeleteTopics Response (Version: 1) => STRING protocol_type => STRINGthrottle_time_ms [topic_error_codes] throttle_time_ms => INT32 (new) topic_error_codes => topic error_code protocoltopic => STRING members => member_id client_id client_host member_metadata member_assignment member_id => STRING client_id => STRING client_host => STRING member_metadata => BYTES member_assignment => BYTES |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
ListGroups Response (Version: 1) => request_throttle_time_ms error_code [groups]
request_throttle_time_ms => INT32 (new)
error_code => INT16
groups => group_id protocol_type
group_id => STRING
protocol_type => STRING |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
ApiVersions Response (Version: 1) => request_throttle_time_ms error_code [api_versions]
request_throttle_time_ms => INT32 (new)
error_code => INT16
api_versions => api_key min_version max_version
api_key => INT16
min_version => INT16
max_version => INT16
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
CreateTopics Response (Version: 2) => request_throttle_time_ms [topic_errors]
request_throttle_time_ms => INT32 (new)
topic_errors => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
DeleteTopics Response (Version: 1) => request_throttle_time_ms [topic_error_codes]
request_throttle_time_ms => INT32 (new)
topic_error_codes => topic error_code
topic => STRING
error_code => INT16 |
error_code => INT16 |
Proposed Changes
Quota entity
Request quotas will be supported for <client-id>
, <user>
and <user, client-id>
similar to the existing produce/fetch byte rate quotas. In addition to produce and fetch rates, an additional quota property will be added for throttling based on I/O thread utilization. As with produce/fetch quotas, request quotas will be per-broker. Defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> levels.
...
Quotas for requests will be configured as a fraction the percentage of time within a quota window that a client is allowed to use across all of the I/O and network threads. The total I/O thread capacity of ((num.io.threads + num.threads
units network.threads) * 100%)
can be distributed between clients/users. For example, with the default configuration of 1 second quota window size, if user alice has a I/O thread request quota of 0.11%, the total time all clients of alice can spend in the request handler and network threads in any one second window is 10 milliseconds. When this time is exceeded, a delay is added to the response to bring alice’s usage within the configured quota. The maximum delay added to any response will be the window size. The calculation of delay will be the same as the current calculation used for throttling produce/fetch requests:
- If O is the observed usage and T is the target usage over a window of W, to bring O down to T, we need to add a delay of X to W such that:
O * W / (W + X) = T
. - Solving for X, we get
X = (O - T)/T * W
. - The response will be throttled by
min(X, W
.The response will be throttled bymin(X, W)
)
Network thread time will be recorded for each request without performing throttling when the time is recorded. When I/O thread time is recorded, throttling will be performed, taking into account the total processing time of the user/client in network threads and I/O threads in the quota window. This simplifies the handling of network thread utilization without integrating the throttling mechanism into the network layer.
The maximum throttle time for any single request will be the quota window size (one second by default). This ensures that timing-sensitive requests like heartbeats are not delayed for extended durations. For example, if a user has a quota of 0.001 1% and a stop-the-world GC pause takes 100ms during the processing of the user's request, we don't want all the requests from the user to be delayed by 100 seconds. By limiting the maximum delay, we reduce the impact of GC pauses and single large requests. To exploit this limit to bypass quota limits, clients would need to generate requests that take significantly longer than the quota limit. If R is the amount of time taken process one request and the user has C active connections, the maximum amount of time a user/client can use in the long term use per quota window is max(quota, C * R)
. In practice, quotas are expected to be much larger than the time taken to process individual requests and hence this limit is sufficient. Byte rate quotas will also additionally help to increase throttling in the case where large produce/fetch requests result in larger per-request time. DoS attacks using large numbers of connections is not addressed in this KIP.
...
Code Block | ||||
---|---|---|---|---|
| ||||
// Quotas for user1 // Zookeeper persistence path /config/users/<encoded-user1> { "version":1, "config": { "producer_byte_rate":"1024", "consumer_byte_rate":"2048", "io.thread.unitsrequest_percentage" : "0.150" } } |
Co-existence of multiple quotas
...
Two new metrics and corresponding sensors will be added to track request-time
and throttle-time
of each quota entity for quota type IOThread
Request. The request-time
sensor will be configured with the quota for the user/client so that quota violations can be used to add delays to the response. Quota window configuration (quota.window.size.seconds
) will be the same as the existing configuration for produce/fetch quotas: 1 second window with 11 samples retained in memory by default. A new delay queue sensor will also be added for quota type IOThread
Request. All the new sensor names (IOThreadRequest-<quota-entity>,
IOThreadThrottleTimeRequestThrottleTime-<quota-entity>
and IOThreadRequest-delayQueue
) are prefixed by the quota type, making these sensors consistent with existing sensors for Produce/Fetch. The new metrics will be in the metrics group IOThread
Request
, distinguishing these from similar metrics for Produce/Fetch byte rates.
Metrics and sensors will be expired as they are today for Produce/Fetch quotas.
On the client side, a new sensor name request-throttle-time
will be added to track request throttle time returned in responses. This is in addition to the similar sensor used to track produce/fetch throttle times, which will continue to be supported. Maximum and average throttle times for request time based throttling will be exposed as metrics in addition to throttle time metrics for produce/fetch byte rates.
...
As described in Scenario 5, to control broker resource utilization allocated to users/clients, both network thread utilization and I/O thread utilization should be limited by quotas. This KIP only addresses quotas for I/O thread utilization. Controlling network thread utilization is more complex and will be addressed in another KIP. The current quota implementation throttles requests by delaying the responses using Purgatory
. This works for request handler thread utilization quotas, but we need to think through how this can be integrated into the network layer. Also, while request handlers have access to both user and client-id and can support quotas at <client-id>, <user> and <user, client-id> levels, the network layer does not have access to client-id.
Compatibility, Deprecation, and Migration Plan
What impact (if any) will there be on existing users?
- None, since by default clients will not be throttled on request processing time.
If we are changing behavior how will we phase out the older behavior?
- Quota limits for request processing time can be configured dynamically if required. Older versions of brokers will ignore request time quotas.
- If request quotas are configured on the broker, throttle time will be returned in the response to clients only if the client supports the new version of requests being throttled.
Test Plan
One set of integration and system tests will be added for request throttling. Since most of the code can be reused from existing producer/consumer quota implementation and since quota tests take a significant amount of time to run, one test for testing the full path should be sufficient.
Rejected Alternatives
Use request rate instead of request processing time for quota bound
Produce and fetch quotas are configured as byte rates (e.g. 10 MB/sec) and enable throttling based on data volume. Requests could be throttled based on request rate (e.g. 10 requests/sec), making request quotas consistent with produce/fetch quotas. But the time taken for processing different requests can vary significantly and since the goal of the KIP is to enable fair allocation of broker resources between users/clients, request processing time is a better metric suited to this quota.
...
the existing produce and fetch sensors will track total throttle time of all requests from producers and consumers respectively. This will include both bandwidth as well as utilization based throttling. Throttle time recording will be moved to NetworkClient
with appropriate sensor parameters so that produce or fetch sensors are updated based on whether the client corresponds to a producer or consumer. This will also enable addition of similar sensors/metrics for admin clients in future.
Compatibility, Deprecation, and Migration Plan
What impact (if any) will there be on existing users?
- None, since by default clients will not be throttled on request processing time.
If we are changing behavior how will we phase out the older behavior?
- Quota limits for request processing time can be configured dynamically if required. Older versions of brokers will ignore request time quotas.
- If request quotas are configured on the broker, throttle time will be returned in the response to clients only if the client supports the new version of requests being throttled.
- If request quotas are configured, client produce/fetch throttle-time metrics will reflect total throttle time including bandwidth and utilization based throttling of these requests. The throttle time returned in produce/fetch responses will include this total throttle time.
Test Plan
One set of integration and system tests will be added for request throttling. Since most of the code can be reused from existing producer/consumer quota implementation and since quota tests take a significant amount of time to run, one test for testing the full path should be sufficient.
Rejected Alternatives
Use request rate instead of request processing time for quota bound
Produce and fetch quotas are configured as byte rates (e.g. 10 MB/sec) and enable throttling based on data volume. Requests could be throttled based on request rate (e.g. 10 requests/sec), making request quotas consistent with produce/fetch quotas. But the time taken for processing different requests can vary significantly and since the goal of the KIP is to enable fair allocation of broker resources between users/clients, request processing time is a better metric suited to this quota.
Use request time percentage across all threads instead of per-thread percentage for quota bound
The KIP proposes to use a quota that specifies the percentage of time as allocated to a client/user as per-thread value, out of a total capacity of ((num.io.threads+num.network.threads) * 100%)
, with the total request processing time measured across all I/O and network threads. An alternative would be to configure relative percentage out of a fixed total capacity of 100. Absolute quota was chosen to avoid automatic changes to client quota values when num.io.threads or num.network.threads
is modified. Since threads are typically based on the number of cores on the broker host, the per-thread quota percentage reflects the % of cores allocated to client/user. This is consistent with other CPU quotas like cgroup and the way CPU usage is reported by commands like top
.
Use fractional units of threads instead of percentage for quota bound
The KIP proposes to use a quota that specifies the absolute fraction total percentage of time within each quota window allocated to a client/user. This is , out of a total capacity of of ((num.io.threads
units since the time is measured across all I/O threads+num.network.threads) * 100%)
. An alternative would be to configure relative percentage out of a fixed total capacity of 100. Absolute quota was chosen to avoid automatic changes to client quota values when num.io.threads
is modified.model each thread as one unit and configure quota as a fraction of the total number of available units. Percentage was chosen instead of fractional units of threads so that the same request_percentage
representing CPU utilization can be continue to be applied even if other threads are added in future.
Allocate percentage of request handler pool as quota bound
...