You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: KAFKA-4195

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka currently supports quotas by data volume.  Clients that produce or fetch messages at a byte rate that exceeds their quota are throttled by delaying the response by an amount that brings the byte rate within the configured quota. However, if a client sends requests too quickly (e.g., a consumer with fetch.max.wait.ms=0), it can still overwhelm the broker even though individual request/response size may be small. It will be useful to additionally support throttling by request rate to ensure that broker resources are not monopolized by some users/clients.

Scenarios:

  1. Clients send requests too quickly - eg. a consumer with fetch.max.wait.ms=0 that polls continuously. Fetch byte rate quotas are not sufficient here, either request count quotas or request processing time quotas are required.
  2. DoS attack from clients that overload brokers with continuous authorized or unauthorized requests. Either request count quotas or request processing time quotas that limit all unauthorized requests and all non-broker (client) requests is required.
  3. Client sends produce requests with compressed messages where decompression takes a long time, blocking the request handler thread. Request processing time quotas are required since neither produce byte rate quotas nor request count quotas will be sufficient to limit the broker resources allocated to users/clients in this case.
  4. Consumer group starts with 10 instances and then increases to 20 instances. Number of requests may double, so request counts increase, even though the load on the broker doesn't double since the number of partitions per fetch request has halved. Quotas based on request count per second may not be easy to configure in this case.
  5. Some requests may use more of their quota on the network threads rather than the request handler threads (eg. disk read for fetches happen on the network threads). While quotas of processing time on the request handler thread limit the request rate in many cases above, for a complete request rate quota solution, network thread utilization also needs to be taken into account.

This KIP proposes to control request handler (I/O) thread utilization using request processing time quotas that limit the amount of time within each quota window that can be used by users/clients. Only I/O thread utilization will be taken into account in this KIP. Network thread utilization (Scenario 5) will be addressed separately in a future KIP since that is a lot more complex.

Public Interfaces

Request quotas

Request quotas will be configured as a fraction of time a client can spend on request handler (I/O) threads within each quota window. Each request handler thread is represented as one request handler unit, giving a total capacity of num.io.threads units. Each request quota will be the units allocated to a user/client.  The limits will be applied to the same quota window configuration (quota.window.size.seconds with 1 second default)  as existing produce/fetch quotas. This approach keeps the code consistent with the existing quota implementation, while making it easy for administrators to allocate a slice of each quota window to users/clients to control request handler thread utilization on the broker. If a client/user exceeds the request processing time limit, responses will be delayed by an amount that brings the rate within the limit. The maximum delay applied will be the quota window size.

Default quotas

By default, clients will not be throttled based on I/O thread utilization, but defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> levels. Defaults as well as overrides are stored as dynamic configuration properties in Zookeeper alongside the other rate limits.

Requests that may be throttled

Requests that update cluster state will be throttled only if authorization for ClusterAction fails. These are infrequent requests for cluster management, typically not used by clients:

  1. StopReplica
  2. ControlledShutdown
  3. LeaderAndIsr
  4. UpdateMetadata

All other requests may be throttled if the rate exceeds the configured quota. All requests that may be throttled will have an additional field request_throttle_time_ms to indicate to the client that the request was throttled.The versions of these requests will be incremented.

Fetch and produce requests will continue to be throttled based on byte rates and may also be throttled based on request handler thread utilization. Fetch requests used for replication will not be throttled based on request times since it is possible to configure replica.fetch.wait.max.ms and use the existing replication byte rate quotas to limit replication rate.

Metrics and sensors

Two new metrics and corresponding sensors will be added to the broker for tracking request-time and throttle-time of each quota entity for the new quota type IOThread. These will be handled similar to the metrics and sensors for Produce/Fetch.

Clients will expose average and maximum request throttle time as JMX metrics similar to the current produce/fetch throttle time metrics.

Tools

kafka-configs.sh will be extended to support request quotas.  A new quota property will be added, which can be applied to <client-id>, <user> or <user, client-id>:

  • io_thread_units : The fractional units of time per quota window (out of a total of num.io.threads units) for requests from the user or client, above which the request may be throttled.

For example:

bin/kafka-configs  --zookeeper localhost:2181 --alter --add-config 'io_thread_units=0.1--entity-name user1 --entity-type users

Default quotas for <client-id>, <user> or <user, client-id> can be configured by omitting entity name. For example:

bin/kafka-configs  --zookeeper localhost:2181 --alter --add-config 'io_thread_units=2.0--entity-type users

Proposed Changes

Quota entity

Request quotas will be supported for <client-id>, <user> and <user, client-id> similar to the existing produce/fetch byte rate quotas.  In addition to produce and fetch rates, an additional quota property will be added for throttling based on I/O thread utilization. As with produce/fetch quotas, request quotas will be per-broker. Defaults can be configured using the dynamic default properties at <client-id>, <user> and <user, client-id> levels.

Request quotas

Quotas for requests will be configured as a fraction of time within a quota window that a client is allowed to use across all of the I/O threads. The total I/O thread capacity of num.io.threads units can be distributed between clients/users. For example, with the default configuration of 1 second quota window size, if user alice has a I/O thread quota of 0.1, the total time all clients of alice can spend in the request handler threads in any one second window is 10 milliseconds.  When this time is exceeded, a delay is added to the response to bring alice’s usage within the configured quota. The maximum delay added to any response will be the window size.  The calculation of delay will be the same as the current calculation used for throttling produce/fetch requests:

  • If O is the observed usage and T is the target usage over a window of W, to bring O down to T, we need to add a delay of X to W such that: O * W / (W + X) = T.
  • Solving for X, we get X = (O - T)/T * W.
  • The response will be throttled by min((X, W)

Sample configuration in Zookeeper

 

Sample quota configuration
// Quotas for user1
// Zookeeper persistence path /config/users/<encoded-user1>
{
    "version":1,
    "config": {
        "producer_byte_rate":"1024",
        "consumer_byte_rate":"2048",
		"io_thread_units" : "0.1"
    }
}

 

Co-existence of multiple quotas

Produce and fetch byte rate quotas will continue to be applied as they are today. Request processing time throttling will be applied on top if necessary. For example, if a large number of small produce requests are sent followed by a very large one, both request time quota and produce byte rate quota may be violated by the same request. The produce byte rate delay is applied first. Request time quota is checked only after the produce delay is applied. The request quota is perhaps no longer violated (or the delay may be lower due to the first delay already applied). The remaining delay if any is applied to the response.

Metrics and sensors

Two new metrics and corresponding sensors will be added to track request-time and throttle-time of each quota entity for quota type IOThread.  The request-time sensor will be configured with the quota for the user/client so that quota violations can be used to add delays to the response. Quota window configuration (quota.window.size.seconds) will be the same as the existing configuration for produce/fetch quotas: 1 second window with 11 samples retained in memory by default.

Metrics and sensors will be expired as they are today for Produce/Fetch quotas.

Future Work

As described in Scenario 5, to control broker resource utilization allocated to users/clients, both network thread utilization and I/O thread utilization should be limited by quotas. This KIP only addresses quotas for I/O thread utilization. Controlling network thread utilization is more complex and will be addressed in another KIP. The current quota implementation throttles requests by delaying the responses using Purgatory. This works for request handler thread utilization quotas, but we need to think through how this can be integrated into the network layer. Also, while request handlers have access to both user and client-id and can support quotas at <client-id>, <user> and <user, client-id> levels, the network layer does not have access to client-id.

Compatibility, Deprecation, and Migration Plan

What impact (if any) will there be on existing users?

  • None, since by default clients will not be throttled on request processing time.

If we are changing behavior how will we phase out the older behavior?

  • Quota limits for request processing time can be configured dynamically if required. Older versions of brokers will ignore request time quotas.
  • If request quotas are configured on the broker, throttle time will be returned in the response to clients only if the client supports the new version of requests being throttled.

Test Plan

One set of integration and system tests will be added for request throttling. Since most of the code can be reused from existing producer/consumer quota implementation and since quota tests take a significant amount of time to run, one test for testing the full path should be sufficient.

Rejected Alternatives

Use request rate instead of request processing time for quota bound

Produce and fetch quotas are configured as byte rates (e.g. 10 MB/sec) and enable throttling based on data volume. Requests could be throttled based on request rate (e.g. 10 requests/sec), making request quotas consistent with produce/fetch quotas. But the time taken for processing different requests can vary significantly and since the goal of the KIP is to enable fair allocation of broker resources between users/clients, request processing time is a better metric suited to this quota.

Use request time percentage instead of absolute fractional units for quota bound

The KIP proposes to use a quota that specifies the absolute fraction of time within each quota window allocated to a client/user. This is out of a total capacity of num.io.threads units since the time is measured across all I/O threads. An alternative would be to configure relative percentage out of a fixed total capacity of 100. Absolute quota was chosen to avoid automatic changes to client quota values when num.io.threads is modified.

Allocate percentage of request handler pool as quota bound

An alternative to monitoring request rate will be to model the request handler pool as a shared resource and allocate a percentage of the pool capacity to each user/client. But since only one request is read into the pool from each connection, this would be a measure of the number of concurrent connections per user/client rather than the rate of usage (a single or small number of connections can still overload the broker with a continuous sequence of requests). And it will be harder to compute the amount of time to delay a request when the bound is violated.

Exempt timing-sensitive requests such as consumer heartbeat

Consumer requests such as heartbeat are timing sensitive and it is possible that throttling these requests could result in the consumer falling out of the consumer group. However, if we don't throttle some requests, a misbehaving application or a misconfigured application with low heartbeat interval or a client library with a bug can cause broker overload. To protect the cluster from DoS attacks and avoid these overload scenarios, all client requests will be throttled. In most deployments, quotas will be set to a large enough value to provide a safety net rather than very small values that throttle well-behaved clients, so this shouldn't be an issue.

 

 


  • No labels