Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under Discussion Accepted

Discussion threadhttp://markmail.org/thread/hpp4ucysifucwmgd

JIRA:   KAFKA-2083

Motivation

Currently, the Kafka cluster does not have the ability to throttle/rate limit producers and consumers. It is possible for a consumer to consume extremely fast and thus monopolize broker resources as well as cause network saturation. It is also possible for a producer to push extremely large amounts to data thus causing memory pressure and large IO on broker instances. We need a mechanism to enforce quotas on a per-user basisclient basis.

In this KIP, we will discuss a proposal to implement quotas in Kafka. We are proposing an approach that can used for both producer and consumer side quotas.

...

  1. Metrics - The Quota Metrics which will be captured on a per-clientId basis, will be exposed to JMX. These are new metrics and do not use codahale. Instead, they use KM (Kafka Metrics) which is a new metrics library written for Kafka. More details in the Metrics section below.
  2. Response format - The Fetch and Produce response formats will evolve to include a new field called 'throttleTimeMs' which will convey the amount of time a request was throttled.

Proposed Changes

The changes are mainly focussed around Quota Policy, Distribution, Metrics, Quota Actions and config management.

...

This section basically explains what dimensions we throttle on. The proposal is to throttle based on client IDs. Any client using the system presents a client id or consumer group (producer or consumer). Each client will receive a default quota (for e.g. 10MBps read, 5MBps write) which can be overridden on a per-client basis dynamically. In addition, there will be a quota reserved for clients not presenting a client id (for e.g. simple consumers not setting the id). This will default to an empty client id ("") and all such clients will share the quota for that empty id (which should be the default quota). Producer side quotas are defined in terms of bytes written per second per client id. Consumer quotas as defined in terms of bytes read per second per client id. We do expect that there will be some high volume clients that require more than the default quota. In short, we are proposing fixed quotas for everyone but the top k outliers which can justify custom quotas. If users clients violate their quota, we will throttle fetch/produce requests for them.

...

How do the Kafka Server react when it detects a quota violation? In our proposal, the broker does not return an error rather it attempts to slow down a client exceeding it's quota. The slowdown will not be return an error to the client but will be visible through metrics. The behavior is subtly different for producer and consumer.

In case of producer, the broker will append the request batch to the log but will not return a response immediately. Instead, the response is inserted into a DelayedOperationPurgatory for a certain period of time before returning to the client. It is important that the append be done before delaying the client to avoid holding large batches in the broker heap and running out of memory.

In case of consumer, the action is very similar to a delayed fetch request. The request needs  the request needs to be delayed before reading from performing the log read to avoid running out of memory pressure.

Here's how the code might look inside the Broker (in KafkaApis, ReplicaManager or helper class). This is just for illustration purposes.

...

Kafka server has decided to migrate it's metrics to Kafka Metrics (KM). For more details read this. Kafka metrics maintains a Sensor for each measured metric. This can be configured with a "Quota" which is a bound of the min and max value of a metric. If recording a value causes a bound to get exceeded, a QuotaViolationException is thrown. We will add a bytes-in/bytes-out rate sensor per clientId configured with 'N' 1 second windows. This is already supported in MetricConfig. We can make 'N' configurable and use a good default (10). This lets us precisely track the rate per-client-broker rate per second. Having small windows makes the quota system very responsive. A quota will also be configured when creating these metrics. For default clients, the quota used will simply be the default quota value. For clients with overridden quotas, we can use the custom value. 

In order to fully support quotas, we need to make some changes to the metrics package as defined below. 

...

Configuration Management

We need a mechanism to configure the default quotas and the per-user overridesclient overrides. There is general agreement that we eventually need dynamic configs per-user client to fully operationalize quotas but for the purposes of this proposal, we will proceed with static configs.

Code Block
// Default bytes-out per consumer.
quota.consumer.default=2M
quota.producer.default=2M
 
// Overrides
quota.producer.override="clientA:4M;,clientB:10M"
quota.consumer.override="clientC:3M;,clientD:5M"

There is a separate discussion for dynamic configs per-user client that isn't fully fleshed out. If it makes sense and people agree on a final design, we can model quotas using it.

 

Tooling/Monitoring Changes

Upon implementation, we will start exposing per-client byte rate metrics metrics the following metrics to JMX. Since  Since these are new metrics there are no backward compatibility issues. These metrics will be documented prior to the release. 

  • per-client byte rate metrics
  • per-client metric to indicate throttle times

Client Status Code

In the current response protocol, there is no way to return the quota status back to the client. How do clients know if they are being throttled or not? One Our proposed solution is to add a new field in the response that indicates the quota status called 'throttleTimeMs'. This will require us to increment the protocol version for both producer and consumer. Clients that send V0 Only clients sending version (1) of those requests will not receive the quota status flag in the response. Version 1 of the request has the same format as version 0. On the client side (producer and consumer), we can have a metric that exposes whether the client was throttled during the last measured window or not.

 

This work item does not block the core implementation of quotas but IMO is very useful to have. 

derive and expose the following metrics over a time window:

  • Max request throttle time - The largest throttle time across requests in the time window. If the client is not throttled, it will simply expose 0.
  • Avg request throttle time -  The average throttle time across requests in the time window.

 The response protocol can define a top-level field called "isThrottledthrottleTime". Example:

Code Block
// Current fetch response
public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));


// Proposed fetch response
public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0)), new Field("isThrottledthrottle_time_ms", INT8INT32, "WasAmount of time in milliseconds the request was throttled orif at notall"));
 
// Current produce response
public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
                                                                new ArrayOf(new Schema(new Field("topic", STRING),
                                                                                       new Field("partition_responses",
                                                                                                 new ArrayOf(new Schema(new Field("partition",
                                                                                                                                  INT32),
                                                                                                                        new Field("error_code",
                                                                                                                                  INT16),
                                                                                                                        new Field("base_offset",
                                                                                                                                  INT64))))))));


// Proposed produce response
public static final Schema PRODUCE_RESPONSE_V0V1 = new Schema(new Field("responses",
                                                                new ArrayOf(new Schema(new Field("topic", STRING),
                                                                                       new Field("partition_responses",
                                                                                                 new ArrayOf(new Schema(new Field("partition",
                                                                                                                                  INT32),
                                                                                                                        new Field("error_code",
                                                                                                                                  INT16),
                                                                                                                        new Field("base_offset",
                                                                                                                                  INT64))))))),
                                                            new Field("isThrottledthrottle_time_ms", INT8INT32, "WasAmount of time in milliseconds the request was throttled if orat notall"));

NOTE: This was briefly discussed on the mailing list and was never concluded. One thing that we did rule out was to overload the error code field because being throttled isn't really an error and it can get really confusing.

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

Topic Based QuotasWe initially considered doing topic based quotas. This was rejected in favor or client based quotas since it seemed more natural to throttle clients than topics. Multiple producers/consumers can publish/fetch from a single topic. This makes it hard to eliminate interference of badly behaved clients. Topic based quotas also are harder to reason about when clients are accessing multiple topics (e.g. wildcard consumer). If quota for 1 topic is violated, how do we handle requests for the other topics?

Quota Distribution 

A) Instead of throttling by client id, we can consider throttling by a combination of "clientId, topic". The default quota is also defined on this tuple i.e. 10Mbps for each "clientId, topic". The obvious downside here is that a wildcard consumer can allocate almost infinite quota for itself if it subscribes to all topics. Producers can also be allocated very large quotas based on the number of topics they publish to. In practice, this approach needs to be paired with a cap on the maximum amount of data a client can publish/consume. But this cluster-wide cap again requires global co-ordination which is we mentioned is out of scope.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing clientss?
    Once the brokers are upgraded, existing clients will receive a default quota. A client's produce and fetch latency may increase if he is throttled. Clients that require more than the default quota will need to have overrides in advance to avoid getting throttled.
  • If we are changing behavior how will we phase out the older behavior?
    1. Set "inter.broker.protocol.version" to that of the previous release (0.8.2)
    2. Perform a rolling upgrade on the brokers so that they can all decipher the latest version of the Fetch response as described above.
    3. Change to "inter.broker.protocol.version" config to that of current release (0.9?).
    4. Rolling upgrade of the brokers
 

Rejected Alternatives

Topic Based QuotasWe initially considered doing topic based quotas. This was rejected in favor of the client based quotas since it seemed more natural to throttle clients than topics. Multiple producers/consumers can publish/fetch from a single topic. This makes it hard to eliminate interference of badly behaved clients. Topic based quotas also are harder to reason about when clients are accessing multiple topics (e.g. wildcard consumer). If quota for 1 topic is violated, how do we handle requests for the other topics?

Quota Distribution 

A) Instead of throttling by client id, we can consider throttling by a combination of "clientId, topic". The default quota is also defined on this tuple i.e. 10Mbps for each "clientId, topic". The obvious downside here is that a wildcard consumer can allocate almost infinite quota for itself if it subscribes to all topics. Producers can also be allocated very large quotas based on the number of topics they publish to. In practice, this approach needs to be paired with a cap on the maximum amount of data a client can publish/consume. But this cluster-wide cap again requires global co-ordination which is we mentioned is out of scope.

B) If we instead model the quotas on a per-topic basis, provisioned quota can be split equally among all the partitions of a topic. For example: If a topic T has 8 partitions and total configured write throughput of 8MBps, each partition gets 1Mbps. If a broker hosts 3 leader partitions for T, then that topic is allowed 3MBps on that broker regardless of the partitions that traffic is directed to. In terms of quota distribution, this is the most elegant model. However, since there can be multiple producers/consumers of a topic, a single misbehaving client can cause multiple clients to get throttled which is undesirable behavior. This is why we have chosen to model quotas on a per-client basis. Since a client can consume from/produce to any number of topic-partitions, it is difficult to track accurately a client's quota usage across the entire cluster. This would require some sort of a gossip mechanism we don't currently have. This is why we choose to model quota distribution a per-broker basisB) If we instead model the quotas on a per-topic basis, provisioned quota can be split equally among all the partitions of a topic. For example: If a topic T has 8 partitions and total configured write throughput of 8MBps, each partition gets 1Mbps. If a broker hosts 3 leader partitions for T, then that topic is allowed 3MBps on that broker regardless of the partitions that traffic is directed to. In terms of quota distribution, this is the most elegant model.

Quota Actions

A) Immediately return an error: This is the simplest possible option. However, this requires clients to implement some sort of a back off mechanism since retrying immediately will likely make things worse.

...