Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this KIP, we will discuss a proposal to implement quotas in Kafka. We are proposing a generic framework that can used for both producer and consumer side quotas.


Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

...

Binary log format

...

The network protocol and api behavior

...

Any class in the public packages under clientsConfiguration, especially client configuration

  • org/apache/kafka/common/serialization

  • org/apache/kafka/common

  • org/apache/kafka/common/errors

  • org/apache/kafka/clients/producer

  • org/apache/kafka/clients/consumer (eventually, once stable)

...

Monitoring

...

Command line tools and arguments

...

The only expected change is that clients may be required to handle quota violation errors and retry their requests. See the "Quota Actions" section.

Proposed Changes

Quota Policy

...

Broker Interface

Quota enforcement in the broker is done by the QuotaManager. check() is expected to be called before processing a request. If check() returns false, the broker is expected to take some quota action; otherwise the request is processed as usual.

Code Block
/**
 * This is the basic interface we need to implement in the Kafka broker. This class will record all events processed by the broker. 
 */
public interface QuotaManager {
    /** 
     * The function check should be called before processing any request. If this returns false, then some Quota action has to be taken.
     */
    <T extends RequestOrResponse> boolean check(T request);

    /**
     * This function is called after any client request has been processed and lets the implementation track information about the response. 
     * This is useful for consumer side quotas where we may not know the response sizes and number of messages before processing the request entirely.
     *
     * T : T can be any type of request or response. Typically (FetchRequest or ProducerRequest)
     * C : C is basically the response object. Typically (FetchResponse or ProducerResponse).
     *
     */
	<T extends RequestOrResponse, C extends RequestOrResponse> void onResponse(T request, C response);

	void shutdown();
}

One alternative (proposed by Jun) is to simply use the "fetchSize" parameter on the FetchRequest (per partition) when calling check() to determine if the request is violating the quota or not. This removes the need to have the onResponse method.

However, fetchSize is the maximum amount of data that can be returned for that partition and may not accurately reflect the actual response size which IMO is important. For example: if the fetchSize is set to Integer.MAX_VALUE, every request will get throttled immediately.

Example usage in KafkaApis:

Code Block
// Example for consumers
def handleFetchRequest() {
  val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
  if(!quotaManager.check(fetchRequest))
    // take quota actions

  // Notify the quota manager after the request has been processed
  def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
      val fetchResponse = FetchResponse(fetchRequest.correlationId, responsePartitionData)
     quotaManager.notify(fetchRequest, fetchResponse);
  }
}

Quota Actions

What do we do when check() returns false? We have a few options :that can pick from. Any opinions?

A) Immediately return an error: This is the simplest possible option. However, this requires clients to implement some sort of a back off mechanism since there is no purpose of retrying immediately.

B) Delay the request and return error: If a client is currently exceeding its quota, we can park the request in the purgatory for 'n' milliseconds. After the request expires, the broker can return a QuotaViolationException an error to the consumerclient. This is a nice feature because it effectively throttles the client up to the client request timeout and makes it less critical for them to implement backoff logic. After the error is caught by the client, they can retry immediately. Note that in case of producers, we should be careful about retaining references to large Message bodies because we could easily exhaust broker memory if parking hundreds of requests. The downside of this approach is that it requires the clients to catch quota violation errors and retry their requests.

C) Delay the response but don't return an error: In this alternative (proposed by Jay), no error is returned to the client. Produce requests will get appended to the log immediately and will be kept in purgatory until this client is no longer throttled, after which the producer returns successfully. Fetch requests will also be deposited into the purgatory and serviced only after that client is no longer violating quota. This approach does require an idempotent producer because it can lead to duplicate messages if the client disconnects and retries because the data has already been appended to the log. For example: if the client socket timeout is reached, it will disconnect, reestablish a connection and retry.

Our recommendation is that we choose either B or C.

Custom Implementations

The interface has been kept very generic to allow multiple implementations of the quota policy. The default policy that we have proposed should suffice for the majority of use cases. Since we pass in the actual request and response objects to the QuotaManager, these implementations should have enough information to build complex policies if required.

Compatibility, Deprecation, and Migration Plan

...

Rejected Alternatives

Topic Based Quotas - We initially considered doing topic based quotas. This was rejected in favor or client based quotas since it seemed more natural to throttle clients than topics. Multiple producers/consumers can publish/fetch from a single topic. This makes it hard to eliminate interference of badly behaved clients. Topic based quotas also are harder to reason about when clients are accessing multiple topics (e.g. wildcard consumer). If quota for 1 topic is violated, how do we handle requests for the other topics?

...