Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Current stateUnder Discussion

Discussion thread



Currently, the Kafka cluster does not have the ability to throttle/rate limit producers and consumers. It is possible for a consumer to consume extremely fast and thus monopolize broker resources as well as cause network saturation. It is also possible for a producer to push extremely large amounts to data thus causing memory pressure and large IO on broker instances. We need a mechanism to enforce quotas on a per-user basis.

In this KIP, we will discuss a proposal to implement quotas in Kafka. We are proposing a generic framework an approach that can used for both producer and consumer side quotas.

Public Interfaces

The only expected public change is that clients may be required to handle quota violation errors and retry their requests. See the "Quota Actions" section. In addition, we expect to build tooling to change quota configuration dynamically.

Proposed Changes

Quota Policy

  1. Metrics - The Quota Metrics which will be captured on a per-clientId basis, will be exposed to JMX. These are new metrics and do not use codahale. Instead, they use KM (Kafka Metrics) which is a new metrics library written for Kafka. More details in the Metrics section below.
  2. Client Response - Clients could optionally handle a flag in the response indicating whether they were throttled. This serves to inform users about their quota status and is not an error code.

Proposed Changes

 The changes are mainly focussed around Quota Policy, Distribution, Metrics, Quota Actions and config management.

Quota Policy

This section basically explains what dimensions we throttle on. The proposal is to throttle based on client IDs. Any client using the system presents a client id or consumer group (producer or consumer). Each client will receive a default quota which can be overridden on a per-client basis dynamically. In addition, there will be a quota reserved for clients not presenting a client id (for e.g. simple consumers not setting the id). This will default to an empty client id ("") and all such clients will share the quota for that empty id (which should be the default quota).

A quota will defined in terms of read/write bytes per second. Any user client that has just joined the cluster will receive a default quota per broker (for e.g. 10MBps read, 5MBps write). We do expect that there will be some high volume clients that require more than the default quota. For such clients, we need to provide a mechanism to override their quotas. In short, we are proposing fixed quotas for everyone but the top k outliers which can justify custom quotas. If users violate their quota, we will throttle fetch/produce requests for them.

Producer side quotas are defined in terms of bytes written per second per client id. Consumer quotas as defined in terms of bytes read per second per client id. For example: if a client deletes their consumer offsets and "bootstraps" quickly, this should cause a "read bytes per second" violation and will throttle (slow down) that consumer group. It should have no effect on any other clients.

// These metrics should be aggregated over a short period of time (5-10 seconds) before we declare a quota violation. This reduces the likelihood of bursts in traffic. In addition, replication traffic will be exempt from quotas. 


We need a mechanism to distribute a clients quota across all the brokers in a cluster. There are several options available and they have been described in the rejected alternatives section at the bottomOur proposal is to define the bandwidth on a per-broker basis. Each client can publish a maximum of X MBps (configurable) per broker before it gets throttled. This approach assumes that client requests are properly distributed among all the broker instances which may not always be the case. However, this is much better than having a fixed cluster wide bandwidth per client because that would require a mechanism to share current quota usage per-client among all the brokers. This can be very tricky to implement and is outside the scope of this proposal. 


Kafka server has decided to migrate it's metrics to Kafka Metrics (KM). For more details read this. Kafka metrics maintains a Sensor for each measured metric. This can be configured with a "Quota" which is a bound of the min and max value of a metric. If recording a value causes a bound to get exceeded, a QuotaViolationException is thrown.


We will add a bytes-in/bytes-out rate sensor per clientId configured with 'N' 1 second windows. This is already supported in MetricConfig. We can make 'N' configurable and use a good default (10). This lets us precisely track the rate per-client per second. Having small windows makes the quota system very responsive. A quota will also be configured when creating these metrics. For default clients, the quota used will simply be the default quota value. For clients with overridden quotas, we can use the custom value. 

In order to fully support quotas, we need to make some changes to the metrics package as defined below. 

Delay time in QuotaViolationException

Upon Quota violation, we need to identify how long we should delay requests for the client. It seems natural to have the QuotaViolationException return this information. The great thing about this is that we can catch any QuotaViolationException in the Kafka server and it will have the delay time built into it. This gives us the ability to enforce quotas over any metric we track.

Here's an example of computing the delay time. Assume a quota of 5MBps with a 10 second window. There is a client producing at exactly 5MBps. If it suddenly produces a 15MB batch in the next second, this causes a quota violation because it will have produced

60MB over the last 10 seconds (5*9 + 15 = 60MB).

Code Block
Delay Time = (overall produced in window - quotabound)/Quota limit per second 
In the example above, we will delay for 2 seconds. 
Delay Time = (60 - 50)/5 = 2 second delay.

Allow Quota Value to change




The code in KafkaApis or helper class can be:


try { quota.record(numBytes, time.currentTimeMillis()); } catch(QuotaViolationException ex) { delayedOperation = DelayedOperation(ex.getDelayTime(), ...) quotaPurgatory.tryCompleteElseWatch(operation); }



Configuration Management

How do we manage the quota overrides and the default topic configs? Manually configuring brokers with these is painful. In this case, the ability to dynamically change configs without bouncing brokers is very useful. There is already a proposal/patch for dynamic configuration management by Joe Stein which we plan to leverage for distributing these quota configs. In the future, we also need to think about restricting access to these configs (so that customers cannot modify their own quotas) but that is a separate discussion.
