Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Clients send requests too quickly - eg. a consumer with fetch.max.wait.ms=0 that polls continuously. Fetch byte rate quotas are not sufficient here, either request count quotas or request processing time quotas are required.
  2. DoS attack from clients that overload brokers with continuous authorized or unauthorized requests. Either request count quotas or request processing time quotas that limit all unauthorized requests and all non-broker (client) requests is required.
  3. Client sends produce requests with compressed messages where decompression takes a long time, blocking the request handler thread. Request processing time quotas are required since neither produce byte rate quotas nor request count quotas will be sufficient to limit the broker resources allocated to users/clients in this case.
  4. Consumer group starts with 10 instances and then increases to 20 instances. Number of requests may double, so request counts increase, even though the load on the broker doesn't double since the number of partitions per fetch request has halved. Quotas based on request count per second may not be easy to configure in this case.
  5. Some requests may use more of their quota on the network threads rather than the request handler threads (eg. disk read for fetches happen on the network threads). While quotas of processing time on the request handler thread limit the request rate in many cases above, for a complete request rate quota solution, network thread utilization also needs to be taken into account.
  6. In clusters that enable both TLS and non-TLS endpoints, the load on the network thread is significantly higher for TLS. Network thread utilization needs to be taken into account to address this.

Goals

This KIP proposes to control request handler (I/O) thread utilization using request processing time quotas that limit the amount of time within each quota window that can be used by users/clients. Only I/O thread utilization will be taken into account in this KIP (Scenarios1- 4).

Limitations

  1. Network thread utilization (

...

  1. Scenarios 5, 6) will be addressed separately in a future KIP since that is a lot more complex.

Public Interfaces

Request quotas

  1. This KIP attempts to avoid unintended denial-of-service scenarios from misconfigured application (eg. zero heartbeat interval) or a client library with a bug can cause broker overload. While it can reduce the load in some DoS scenarios, it does not completely protect against DoS attacks from malicious clients. A DDoS attack with large numbers of connections resulting in a large amount of expensive authentication can still overwhelm the broker..

Public Interfaces

Request quotas

Request quotas will be configured as a fraction of time a client can spend on request handler (I/O) threads within each quota window. Each request handler thread is represented as one request handler unit, giving a total capacity of num.io.threads units. Each request quota will be the units allocated to a user/client.  The Request quotas will be configured as a fraction of time a client can spend on request handler (I/O) threads within each quota window. Each request handler thread is represented as one request handler unit, giving a total capacity of num.io.threads units. Each request quota will be the units allocated to a user/client.  The limits will be applied to the same quota window configuration (quota.window.size.seconds with 1 second default)  as existing produce/fetch quotas. This approach keeps the code consistent with the existing quota implementation, while making it easy for administrators to allocate a slice of each quota window to users/clients to control request handler thread utilization on the broker. If a client/user exceeds the request processing time limit, responses will be delayed by an amount that brings the rate within the limit. The maximum delay applied will be the quota window size.

...

  1. StopReplica
  2. ControlledShutdown
  3. LeaderAndIsr
  4. UpdateMetadata

To ensure that these exempt requests cannot be used by clients to launch a DoS attack, these requests will be throttled on quota violation if ClusterAction authorization fails. SaslHandshake request will not be throttled when used for authentication, but will be throttled on quota violation if used at any other time.

All other requests may be throttled if All other requests may be throttled if the rate exceeds the configured quota. All requests that may be throttled will have an additional field request_throttle_time_ms to indicate to the client that the request was throttled.The versions of these requests will be incremented. The existing field throttle_time_ms in produce/fetch responses will be retained renamed to bandwidth_throttle_time_ms to return byte-rate quota throttling times, while the new field returns request throttling time based on request processing time.

...

Two new metrics and corresponding sensors will be added to the broker for tracking request-time and throttle-time of each quota entity for the new quota type IOThread. These will be handled similar to the metrics and sensors for Produce/Fetch. A delay queue sensor with queue-size for the new quota type IOThread will also be added similar to the delay queue sensor for Produce/Fetch.

Clients will expose average and maximum request throttle time as JMX metrics similar to the current produce/fetch throttle time metrics.

All the metrics All the metrics and sensors for request time throttling will be of similar format to the existing produce/fetch metrics and sensors for consistency, but with new group/name indicating the new quota type IOThread, keeping these separate from existing metrics/sensors.

Tools

Clients will expose average and maximum request throttle time as JMX metrics similar to the current produce/fetch throttle time metrics. Two new metrics request-throttle-time-max and request-throttle-time-min will reflect request processing time based throttling while the existing metrics will continue to reflect bandwidth throttling.

Tools

kafka-configs.sh will be extended to support request quotas.  A new quota property will be kafka-configs.sh will be extended to support request quotas.  A new quota property will be added, which can be applied to <client-id>, <user> or <user, client-id>:

  • io_.thread_.units : The fractional units of time per quota window (out of a total of num.io.threads units) for requests from the user or client, above which the request may be throttled.

...

bin/kafka-configs  --zookeeper localhost:2181 --alter --add-config 'io_.thread_units=0.1--entity-name user1 --entity-type users

...

bin/kafka-configs  --zookeeper localhost:2181 --alter --add-config 'io_thread_units=2.0--entity-type users

Proposed Changes

Quota entity

Request quotas will be supported for <client-id>, <user> and <user, client-id> similar to the existing produce/fetch byte rate quotas.  In addition to produce and fetch rates, an additional quota property will be added for throttling based on I/O thread utilization. As with produce/fetch quotas, request quotas will be per-broker. Defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> levels.

Request quotas

Quotas for requests will be configured as a fraction of time within a quota window that a client is allowed to use across all of the I/O threads. The total I/O thread capacity of num.io.threads units can be distributed between clients/users. For example, with the default configuration of 1 second quota window size, if user alice has a I/O thread quota of 0.1, the total time all clients of alice can spend in the request handler threads in any one second window is 10 milliseconds.  When this time is exceeded, a delay is added to the response to bring alice’s usage within the configured quota. The maximum delay added to any response will be the window size.  The calculation of delay will be the same as the current calculation used for throttling produce/fetch requests:

...

Protocol changes

All client requests which are not exempt from request throttling will have a new field containing the time that the request was throttled for.

Code Block
languagejs
titleProduce Response
collapsetrue
Produce Response (Version: 3) => request_throttle_time_ms bandwidth_throttle_time_ms [responses]
 request_throttle_time_ms => INT32 (new)
 bandwidth_throttle_time_ms => INT32 (moved and renamed)
 responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition error_code base_offset log_append_time 
      partition => INT32
      error_code => INT16
      base_offset => INT64
      log_append_time => INT64
Code Block
languagejs
titleFetch Response
collapsetrue
Fetch Response (Version: 4) => request_throttle_time_ms bandwidth_throttle_time_ms [responses]
 request_throttle_time_ms => INT32 (new)
 bandwidth_throttle_time_ms => INT32 (renamed)
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition_header record_set 
      partition_header => partition error_code high_watermark 
        partition => INT32
        error_code => INT16
        high_watermark => INT64
      record_set => RECORDS
Code Block
languagejs
titleOffsets Response
collapsetrue
Offsets Response (Version: 2) => request_throttle_time_ms [responses]
  request_throttle_time_ms => INT32 (new)
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition error_code timestamp offset 
      partition => INT32
      error_code => INT16
      timestamp => INT64
      offset => INT64
Code Block
languagejs
titleMetadata Response
collapsetrue
Metadata Response (Version: 3) => request_throttle_time_ms [brokers] cluster_id controller_id [topic_metadata]
  request_throttle_time_ms => INT32 (new)
  brokers => node_id host port rack 
    node_id => INT32
    host => STRING
    port => INT32
    rack => NULLABLE_STRING
  cluster_id => NULLABLE_STRING
  controller_id => INT32
  topic_metadata => topic_error_code topic is_internal [partition_metadata] 
    topic_error_code => INT16
    topic => STRING
    is_internal => BOOLEAN
    partition_metadata => partition_error_code partition_id leader [replicas] [isr] 
      partition_error_code => INT16
      partition_id => INT32
      leader => INT32
      replicas => INT32
      isr => INT32
Code Block
languagejs
titleOffsetCommit Response
collapsetrue
OffsetCommit Response (Version: 3) => request_throttle_time_ms [responses]
  request_throttle_time_ms => INT32 (new)
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition error_code 
      partition => INT32
      error_code => INT16
Code Block
languagejs
titleOffsetFetchResponse
collapsetrue
OffsetFetch Response (Version: 3) => request_throttle_time_ms [responses] error_code
  request_throttle_time_ms => INT32 (new)
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition offset metadata error_code 
      partition => INT32
      offset => INT64
      metadata => NULLABLE_STRING
      error_code => INT16
  error_code => INT16
Code Block
languagejs
titleGroupCoordinator Response
collapsetrue
GroupCoordinator Response (Version: 1) => request_throttle_time_ms error_code coordinator 
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
  coordinator => node_id host port 
    node_id => INT32
    host => STRING
    port => INT32
Code Block
languagejs
titleJoinGroup Response
collapsetrue
JoinGroup Response (Version: 2) => request_throttle_time_ms error_code generation_id group_protocol leader_id member_id [members]
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
  generation_id => INT32
  group_protocol => STRING
  leader_id => STRING
  member_id => STRING
  members => member_id member_metadata 
    member_id => STRING
    member_metadata => BYTES
Code Block
languagejs
titleHeartbeat Response
collapsetrue
Heartbeat Response (Version: 1) => request_throttle_time_ms error_code
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
Code Block
languagejs
titleLeaveGroup Response
collapsetrue
LeaveGroup Response (Version: 1) => request_throttle_time_ms error_code
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
Code Block
languagejs
titleSyncGroup Response
collapsetrue
SyncGroup Response (Version: 1) => request_throttle_time_ms error_code member_assignment 
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
  member_assignment => BYTES
Code Block
languagejs
titleDescribeGroups Response
collapsetrue
DescribeGroups Response (Version: 1) => request_throttle_time_ms [groups] 
  request_throttle_time_ms => INT32 (new)
  groups => error_code group_id state protocol_type protocol [members] 
    error_code => INT16
    group_id => STRING
    state => STRING
    protocol_type => STRING
    protocol => STRING
    members => member_id client_id client_host member_metadata member_assignment 
      member_id => STRING
      client_id => STRING
      client_host => STRING
      member_metadata => BYTES
      member_assignment => BYTES
Code Block
languagejs
titleListGroups Response
collapsetrue
ListGroups Response (Version: 1) => request_throttle_time_ms error_code [groups] 
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
  groups => group_id protocol_type 
    group_id => STRING
    protocol_type => STRING
Code Block
languagejs
titleApiVersions Response
collapsetrue
ApiVersions Response (Version: 1) => request_throttle_time_ms error_code [api_versions] 
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
  api_versions => api_key min_version max_version 
    api_key => INT16
    min_version => INT16
    max_version => INT16

 

 

Code Block
languagejs
titleCreateTopics Response
collapsetrue
CreateTopics Response (Version: 2) => request_throttle_time_ms [topic_errors] 
  request_throttle_time_ms => INT32 (new)
  topic_errors => topic error_code error_message 
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING
Code Block
languagejs
titleDeleteTopics Response
collapsetrue
DeleteTopics Response (Version: 1) => request_throttle_time_ms [topic_error_codes] 
  request_throttle_time_ms => INT32 (new)
  topic_error_codes => topic error_code 
    topic => STRING
    error_code => INT16

Proposed Changes

Quota entity

Request quotas will be supported for <client-id>, <user> and <user, client-id> similar to the existing produce/fetch byte rate quotas.  In addition to produce and fetch rates, an additional quota property will be added for throttling based on I/O thread utilization. As with produce/fetch quotas, request quotas will be per-broker. Defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> levels.

Request quotas

Quotas for requests will be configured as a fraction of time within a quota window that a client is allowed to use across all of the I/O threads. The total I/O thread capacity of num.io.threads units can be distributed between clients/users. For example, with the default configuration of 1 second quota window size, if user alice has a I/O thread quota of 0.1, the total time all clients of alice can spend in the request handler threads in any one second window is 10 milliseconds.  When this time is exceeded, a delay is added to the response to bring alice’s usage within the configured quota. The maximum delay added to any response will be the window size.  The calculation of delay will be the same as the current calculation used for throttling produce/fetch requests:

  • If O is the observed usage and T is the target usage over a window of W, to bring O down to T, we need to add a delay of X to W such that: O * W / (W + X) = T.
  • Solving for X, we get X = (O - T)/T * W.
  • The response will be throttled by min((X, W)

The maximum throttle time for any single request will be the quota window size (one second by default). This ensures that timing-sensitive requests like heartbeats are not delayed for extended durations. For example, if a user has a quota of 0.001 and a stop-the-world GC pause takes 100ms during the processing of the user's request, we don't want all the requests from the user to be delayed by 100 seconds. By limiting the maximum delay, we reduce the impact of GC pauses and single large requests. To exploit this limit to bypass quota limits, clients would need to generate requests that take significantly longer than the quota limit. If R is the amount of time taken process one request, the maximum amount of time a user/client can use in the long term per quota window is max(quota, num.io.threads * R). In practice, quotas are expected to be much larger than the time taken to process individual requests and hence this limit should be sufficient. Byte rate quotas will also additionally help to increase throttling in the case where large produce/fetch requests result in larger per-request time.

...

Sample configuration in Zookeeper

...