You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

WIP - Not ready to review

Status

Current stateUnder Discussion

Discussion thread

JIRA

Motivation

Currently, the Kafka cluster does not have the ability to throttle/rate limit producers and consumers. It is possible for a consumer to consume extremely fast and thus monopolize broker resources as well as cause network saturation. It is also possible for a producer to push extremely large amounts to data thus causing memory pressure on broker instances. We need a mechanism to enforce quotas on a per-user basis.

In this KIP, we will discuss a proposal to implement quotas in Kafka. We are proposing a generic framework that can used for both producer and consumer side quotas.


Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Quota Policy

The proposal is to throttle based on client IDs. Any client using the system presents a client id (producer or consumer). Each client will receive a default quota which can be overridden on a per-client. We will refer to these clients as "users" in the rest of the document. In addition, there will be a quota reserved for clients not presenting a client id (simple consumers not setting the id). This will default to an empty client id ("") and all such clients will share the quota for that empty id.

In the initial implementation, a quota will defined terms of read/write bytes per second. Any user that has just joined the cluster will receive a default quota (for e.g. 10MBps read, 5MBps write). We do expect that there will be some high volume clients that require more than the default quota. For such clients, we need to provide a mechanism to override their quotas. In short, we are proposing fixed quotas for everyone but the top k outliers which can justify custom quotas. At any time, we provision such that we can tolerate the top k customers exceeding their quota significantly. If any user violates it's quota, we will throttle fetch/produce requests for that user.

Producer side quotas are defined in terms of bytes written per second per client id. Consumer quotas as defined in terms of bytes read per second per client id. These 2 quotas will be enforced separately. For example: if a client deletes their consumer offsets and "bootstraps" quickly, this will cause a "read bytes per second" violation and will throttle that consumer group. Producers should not be affected 

Quota Overrides

  • It is very useful for cluster operators to dynamically disable/enable quota enforcement for the entire cluster. Such a feature is very useful while rolling out this feature to production environments. 
  • Replication traffic will be exempt from quotas. 
  • Ability to disable quotas on a per-client basis. For example: we may not want to quota mirror makers.

Quota Distribution

Provisioned quota will be split equally among all the partitions of a topic. For example: If a topic T has 8 partitions and total configured write throughput of 8MBps, each partition gets 1Mbps. If a broker hosts 3 leader partitions for T, then that topic is allowed 3MBps on that broker regardless of the partitions that traffic is directed to.

 

Configuration Management

How do we manage the quota overrides and the default topic configs? Manually configuring brokers with these is painful. In this case, the ability to dynamically change configs without bouncing brokers is very useful. There is already a proposal/patch for dynamic configuration management by Joe Stein which we plan to leverage for distributing these quota configs. In the future, we also need to think about restricting access to these configs (so that customers cannot modify their own quotas) but that is a separate discussion.

Broker Interface

Quota enforcement in the broker is done by the QuotaManager. check() is expected to be called before processing a request. If check() returns false, the broker is expected to take some quota action; otherwise the request is processed as usual.

/**
 * This is the basic interface we need to implement in the Kafka broker. This class will record all events processed by the broker. 
 */
public interface QuotaManager {
    /** 
     * The function check should be called before processing any request. If this returns false, then some Quota action has to be taken.
     */
    <T extends RequestOrResponse> boolean check(T request);

    /**
     * This function is called after any client request has been processed and lets the implementation track information about the response. 
     * This is useful for consumer side quotas where we may not know the response sizes and number of messages before processing the request entirely.
     *
     * T : T can be any type of request or response. Typically (FetchRequest or ProducerRequest)
     * C : C is basically the response object. Typically (FetchResponse or ProducerResponse).
     *
     */
	<T extends RequestOrResponse, C extends RequestOrResponse> void onResponse(T request, C response);

	void shutdown();
}

One alternative (proposed by Jun) is to simply use the "fetchSize" parameter on the FetchRequest (per partition) when calling check() to determine if the request is violating the quota or not. This removes the need to have the onResponse method.

However, fetchSize is the maximum amount of data that can be returned for that partition and may not accurately reflect the actual response size which IMO is important. For example: if the fetchSize is set to Integer.MAX_VALUE, every request will get throttled immediately.

Example usage in KafkaApis:

// Example for consumers
def handleFetchRequest() {
  val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
  if(!quotaManager.check(fetchRequest))
    // take quota actions

  // Notify the quota manager after the request has been processed
  def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
      val fetchResponse = FetchResponse(fetchRequest.correlationId, responsePartitionData)
     quotaManager.notify(fetchRequest, fetchResponse);
  }
}

Quota Actions

Custom Implementations

The interface has been kept very generic to allow multiple implementations of the quota policy. The default policy that we have proposed should suffice for the majority of use cases. Since we pass in the actual request and response objects to the QuotaManager, these implementations should have enough information to build complex policies if required.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels