Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

All other requests may be throttled if the rate exceeds the configured quota. Produce/Fetch requests will return the total throttling time in the response. All other requests that may be throttled will have an additional field request_throttle_time_ms to indicate to the client that the request was throttled. The versions of these requests will be incremented. The existing field throttle_time_ms in produce/fetch responses will be renamed to bandwidth_throttle_time_ms to return byte-rate quota throttling times, while the new field returns request throttling time based on request processing time.

Fetch and produce requests will continue to be throttled based on byte rates and may also be throttled based on request handler thread utilization. Fetch requests used for replication will not be throttled based on request times since it is possible to configure replica.fetch.wait.max.ms and use the existing replication byte rate quotas to limit replication rate.

...

Clients will expose average and maximum request throttle time as JMX metrics similar to the current produce/fetch throttle time metrics. Two new metrics request-throttle-time-These metrics will reflect the total throttle time for produce and fetch requests including both byte-rate throttling and processing time throttling. Two new metrics request-throttle-time-max and request-throttle-time-min will be added to reflect total request processing time based throttling while the existing metrics will continue to reflect bandwidth throttlingfor all requests.

Tools

kafka-configs.sh will be extended to support request quotas.  A new quota property will be added, which can be applied to <client-id>, <user> or <user, client-id>:

...

All client requests which are not exempt from request throttling will have a new field containing the time in milliseconds that the request was throttled for.

Code Block
languagejs
titleProduce Response
collapsetrue
Produce Response (Version: 3) => request_throttle_time_ms bandwidth_throttle_time_ms [responses]
 request_throttle_time_ms => INT32 (new)
 bandwidth_throttle_time_ms => INT32 (moved and renamed)
 responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition error_code base_offset log_append_time 
      partition => INT32
      error_code => INT16
      base_offset => INT64
      log_append_time => INT64

...

languagejs
titleFetch Response
collapsetrue

...

in milliseconds that the request was throttled for.

 

Code Block
languagejs
titleOffsets Response
collapsetrue
Offsets Response (Version: 2) => request_throttle_time_ms [responses]
  request_throttle_time_ms => INT32 (new)
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition error_code timestamp offset 
      partition => INT32
      error_code => INT16
      timestamp => INT64
      offset => INT64
Code Block
languagejs
titleMetadata Response
collapsetrue
Metadata Response (Version: 3) => request_throttle_time_ms [brokers] cluster_id controller_id [topic_metadata]
  request_throttle_time_ms => INT32 (new)
  brokers => node_id host port rack 
    node_id => INT32
    host => STRING
    port => INT32
    rack => NULLABLE_STRING
  cluster_id => NULLABLE_STRING
  controller_id => INT32
  topic_metadata => topic_error_code topic is_internal [partition_metadata] 
    topic_error_code => INT16
    topic => STRING
    is_internal => BOOLEAN
    partition_metadata => partition_error_code partition_id leader [replicas] [isr] 
      partition_error_code => INT16
      partition_id => INT32
      leader => INT32
      replicas => INT32
      isr => INT32
Code Block
languagejs
titleOffsetCommit Response
collapsetrue
OffsetCommit Response (Version: 3) => request_throttle_time_ms [responses]
  request_throttle_time_ms => INT32 (new)
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition error_code 
      partition => INT32
      error_code => INT16
Code Block
languagejs
titleOffsetFetchResponse
collapsetrue
OffsetFetch Response (Version: 3) => request_throttle_time_ms [responses] error_code
  request_throttle_time_ms => INT32 (new)
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition offset metadata error_code 
      partition => INT32
      offset => INT64
      metadata => NULLABLE_STRING
      error_code => INT16
  error_code => INT16
Code Block
languagejs
titleGroupCoordinator Response
collapsetrue
GroupCoordinator Response (Version: 1) => request_throttle_time_ms error_code coordinator 
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
  coordinator => node_id host port 
    node_id => INT32
    host => STRING
    port => INT32
Code Block
languagejs
titleJoinGroup Response
collapsetrue
JoinGroup Response (Version: 2) => request_throttle_time_ms error_code generation_id group_protocol leader_id member_id [members]
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
  generation_id => INT32
  group_protocol => STRING
  leader_id => STRING
  member_id => STRING
  members => member_id member_metadata 
    member_id => STRING
    member_metadata => BYTES
Code Block
languagejs
titleHeartbeat Response
collapsetrue
Heartbeat Response (Version: 1) => request_throttle_time_ms error_code
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
Code Block
languagejs
titleLeaveGroup Response
collapsetrue
LeaveGroup Response (Version: 1) => request_throttle_time_ms error_code
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
Code Block
languagejs
titleSyncGroup Response
collapsetrue
SyncGroup Response (Version: 1) => request_throttle_time_ms error_code member_assignment 
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
  member_assignment => BYTES
Code Block
languagejs
titleDescribeGroups Response
collapsetrue
DescribeGroups Response (Version: 1) => request_throttle_time_ms [groups] 
  request_throttle_time_ms => INT32 (new)
  groups => error_code group_id state protocol_type protocol [members] 
    error_code => INT16
    group_id => STRING
    state => STRING
    protocol_type => STRING
    protocol => STRING
    members => member_id client_id client_host member_metadata member_assignment 
      member_id => STRING
      client_id => STRING
      client_host => STRING
      member_metadata => BYTES
      member_assignment => BYTES
Code Block
languagejs
titleListGroups Response
collapsetrue
ListGroups Response (Version: 1) => request_throttle_time_ms error_code [groups] 
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
  groups => group_id protocol_type 
    group_id => STRING
    protocol_type => STRING
Code Block
languagejs
titleApiVersions Response
collapsetrue
ApiVersions Response (Version: 1) => request_throttle_time_ms error_code [api_versions] 
  request_throttle_time_ms => INT32 (new)
  error_code => INT16
  api_versions => api_key min_version max_version 
    api_key => INT16
    min_version => INT16
    max_version => INT16
Code Block
languagejs
titleCreateTopics Response
collapsetrue
CreateTopics Response (Version: 2) => request_throttle_time_ms [topic_errors] 
  request_throttle_time_ms => INT32 (new)
  topic_errors => topic error_code error_message 
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING
Code Block
languagejs
titleDeleteTopics Response
collapsetrue
DeleteTopics Response (Version: 1) => request_throttle_time_ms [topic_error_codes] 
  request_throttle_time_ms => INT32 (new)
  topic_error_codes => topic error_code 
    topic => STRING
    error_code => INT16

...

On the client side, a new sensor name request-throttle-time will be added to track total request throttle time returned in all responses. This is in addition to the similar sensor used to track produce/fetch throttle times, which will continue to be supported. Existing produce/fetch throttle times will include total throttling time for both bandwidth and utilization for produce/fetch requests. Maximum and average throttle times for request time based throttling will be exposed as metrics in addition to throttle time metrics for produce/fetch byte rates.

Anchor
future
future
Future Work

...

  • Quota limits for request processing time can be configured dynamically if required. Older versions of brokers will ignore request time quotas.
  • If request quotas are configured on the broker, throttle time will be returned in the response to clients only if the client supports the new version of requests being throttled.
  • If request quotas are configured, client produce/fetch throttle-time metrics will reflect total throttle time including bandwidth and utilization based throttling of these requests. The throttle time returned in produce/fetch responses will include this total throttle time.

Test Plan

One set of integration and system tests will be added for request throttling. Since most of the code can be reused from existing producer/consumer quota implementation and since quota tests take a significant amount of time to run, one test for testing the full path should be sufficient.

...