You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Status

Current stateUnder Discussion

Discussion thread

JIRA

Motivation

Currently, the Kafka cluster does not have the ability to throttle/rate limit producers and consumers. It is possible for a consumer to consume extremely fast and thus monopolize broker resources as well as cause network saturation. It is also possible for a producer to push extremely large amounts to data thus causing memory pressure on broker instances. We need a mechanism to enforce quotas on a per-user basis.

In this KIP, we will discuss a proposal to implement quotas in Kafka. We are proposing a generic framework that can used for both producer and consumer side quotas.


Public Interfaces

The only expected public change is that clients may be required to handle quota violation errors and retry their requests. See the "Quota Actions" section. In addition, we expect to build tooling to change quota configuration dynamically.

Proposed Changes

Quota Policy

The proposal is to throttle based on client IDs. Any client using the system presents a client id (producer or consumer). Each client will receive a default quota which can be overridden on a per-client basis. We will "clients" and "users" interchangeably in the rest of the document. In addition, there will be a quota reserved for clients not presenting a client id (simple consumers not setting the id). This will default to an empty client id ("") and all such clients will share the quota for that empty id (which should be the default quota).

A quota will defined terms of read/write bytes per second. Any user that has just joined the cluster will receive a default quota (for e.g. 10MBps read, 5MBps write). We do expect that there will be some high volume clients that require more than the default quota. For such clients, we need to provide a mechanism to override their quotas. In short, we are proposing fixed quotas for everyone but the top k outliers which can justify custom quotas. If users violate their quota, we will throttle fetch/produce requests for them.

Producer side quotas are defined in terms of bytes written per second per client id. Consumer quotas as defined in terms of bytes read per second per client id. For example: if a client deletes their consumer offsets and "bootstraps" quickly, this should cause a "read bytes per second" violation and will throttle that consumer group. It should have no effect on any producer.

These metrics should be aggregated over a short period of time (5-10 seconds) before we declare a quota violation. This reduces the likelihood of bursts in traffic. In addition, replication traffic will be exempt from quotas. 

Quota Distribution

Provisioned quota will be split equally among all the partitions of a topic. For example: If a topic T has 8 partitions and total configured write throughput of 8MBps, each partition gets 1Mbps. If a broker hosts 3 leader partitions for T, then that topic is allowed 3MBps on that broker regardless of the partitions that traffic is directed to.

 

Configuration Management

How do we manage the quota overrides and the default topic configs? Manually configuring brokers with these is painful. In this case, the ability to dynamically change configs without bouncing brokers is very useful. There is already a proposal/patch for dynamic configuration management by Joe Stein which we plan to leverage for distributing these quota configs. In the future, we also need to think about restricting access to these configs (so that customers cannot modify their own quotas) but that is a separate discussion.

Broker Interface

Quota enforcement in the broker is done by the QuotaManager. check() is expected to be called before processing a request. If check() returns false, the broker is expected to take some quota action; otherwise the request is processed as usual.

/**
 * This is the basic interface we need to implement in the Kafka broker. This class will record all events processed by the broker. 
 */
public interface QuotaManager {
    /** 
     * The function check should be called before processing any request. If this returns false, then some Quota action has to be taken.
     */
    <T extends RequestOrResponse> boolean check(T request);

    /**
     * This function is called after any client request has been processed and lets the implementation track information about the response. 
     * This is useful for consumer side quotas where we may not know the response sizes and number of messages before processing the request entirely.
     *
     * T : T can be any type of request or response. Typically (FetchRequest or ProducerRequest)
     * C : C is basically the response object. Typically (FetchResponse or ProducerResponse).
     *
     */
	<T extends RequestOrResponse, C extends RequestOrResponse> void onResponse(T request, C response);

	void shutdown();
}

One alternative (proposed by Jun) is to simply use the "fetchSize" parameter on the FetchRequest (per partition) when calling check() to determine if the request is violating the quota or not. This removes the need to have the onResponse method. However, fetchSize is the maximum amount of data that can be returned per-partition within a request and may not accurately reflect the actual response size which IMO is important. For example: if the fetchSize is set to Integer.MAX_VALUE, every request will get throttled immediately which may not be the behavior we want. 

Example usage in KafkaApis:

// Example for consumers
def handleFetchRequest() {
  val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
  if(!quotaManager.check(fetchRequest))
    // take quota actions

  // Notify the quota manager after the request has been processed
  def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
      val fetchResponse = FetchResponse(fetchRequest.correlationId, responsePartitionData)
      quotaManager.onResponse(fetchRequest, fetchResponse);
  }
}

// Example for producers
def handleProducerRequest(request: RequestChannel.Request) {
  val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]

  if(!quotaManager .check(fetchRequest)) {
    // take quota actions after emptying the request buffer
    produceRequest.emptyData();
  }

  // the callback for sending a produce response
  def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
    val response = ProducerResponse(produceRequest.correlationId, responseStatus)
    quotaManager.onResponse(produceRequest, response);
  }
}

Quota Actions

What do we do when check() returns false? We have a few options that can pick from. Any opinions?

A) Immediately return an error: This is the simplest possible option. However, this requires clients to implement some sort of a back off mechanism since retrying immediately will likely make things worse.

B) Delay the request and return error: If a client is currently exceeding its quota, we can park the request in purgatory for 'n' milliseconds. After the request expires, the broker can return an error to the client. This is a nice feature because it effectively throttles the client up to the request timeout and makes it less critical for them to implement backoff logic. After the error is caught by the client, they can retry immediately. Note that in case of producers, we should be careful about retaining references to large Message bodies because we could easily exhaust broker memory if parking hundreds of requests. The downside of this approach is that it requires the clients to catch quota violation errors and retry their requests.

C) Delay the response but don't return an error: In this alternative (proposed by Jay), no error is returned to the client. Produce requests will get appended to the log immediately and will be kept in purgatory until this client is no longer throttled, after which the producer returns successfully. Fetch requests will also be deposited into the purgatory and serviced only after that client is no longer violating quota. Ideally, this approach requires an idempotent producer because it can lead to duplicate messages if the client disconnects and retries because the data has already been appended to the log. For example: if the client socket timeout is reached, it will disconnect, reestablish a connection and retry.

Our recommendation is that we choose either B or C.

Tooling/Monitoring Changes

Along with this feature, we are proposing to add the following tools/admin operations:

  • Dynamically disable/enable quota enforcement for the entire cluster. Such a feature is very useful while rolling out this feature to production environments.
  • Ability to disable quotas on a per-client basis. For example: we may not want to throttle mirror makers.
  • Dynamically change quotas for any client id.


We also plan to expose the fraction of quota used on a per-client basis via JMX (0-100%, where 100 means throttled).

Custom Implementations

The interface has been kept very generic to allow multiple implementations of the quota policy. The default policy that we have proposed should suffice for the majority of use cases. Since we pass in the actual request and response objects to the QuotaManager, these implementations should have enough information to build complex policies if required.

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

Topic Based Quotas - We initially considered doing topic based quotas. This was rejected in favor or client based quotas since it seemed more natural to throttle clients than topics. Multiple producers/consumers can publish/fetch from a single topic. This makes it hard to eliminate interference of badly behaved clients. Topic based quotas also are harder to reason about when clients are accessing multiple topics (e.g. wildcard consumer). If quota for 1 topic is violated, how do we handle requests for the other topics?

Static Configs - Initially, our discussions assumed that we would use the current configuration scheme to manage quota overrides. However, changing overrides will require bouncing the cluster which is a big penalty to pay operationally.


  • No labels