Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We need a mechanism to distribute a clients quota across all the brokers in a cluster. Our proposal is to define the bandwidth on a per-broker basis. Each client can publish a maximum of X MBps (configurable) per broker before it gets throttled. This approach assumes that client requests are properly distributed among all the broker instances which may not always be the case. However, this is much better than having a fixed cluster wide bandwidth per client because that would require a mechanism to share current quota usage per-client among all the brokers. This can be very tricky to implement and is outside the scope of this proposal. 

 

Quota Actions

 

How do the Kafka Server react when it detects a quota violation? In our proposal, the broker does not return an error rather it attempts to slow down the client exceeding it's quota. The behavior is subtly different for producer and consumer.

In case of producer, the broker will append the request batch to the log but will not return a response immediately. Instead, the response is inserted into a DelayedOperationPurgatory for a certain period of time before returning to the client. It is important that the append be done before delaying the client to avoid holding large batches in the broker heap and running out of memory.

In case of consumer, the action is very similar to a delayed fetch request. The request needs to be delayed before reading from the log to avoid running out of memory.

Here's how the code might look inside the Broker (in KafkaApi's or helper class).

Code Block
 

 

Metrics

Kafka server has decided to migrate it's metrics to Kafka Metrics (KM). For more details read this. Kafka metrics maintains a Sensor for each measured metric. This can be configured with a "Quota" which is a bound of the min and max value of a metric. If recording a value causes a bound to get exceeded, a QuotaViolationException is thrown.

 

We will add a bytes-in/bytes-out rate sensor per clientId configured with 'N' 1 second windows. This is already supported in MetricConfig. We can make 'N' configurable and use a good default (10). This lets us precisely track the rate per-client per second. Having small windows makes the quota system very responsive. A quota will also be configured when creating these metrics. For default clients, the quota used will simply be the default quota value. For clients with overridden quotas, we can use the custom value. 

...

Code Block
Delay Time = (overall produced in window - quotabound)/Quota limit per second 
 
In the example above, we will delay for 2 seconds. 
Delay Time = (60 - 50)/5 = 2 second delay.

Allow Quota Value to change

 

...

 

The code in KafkaApis or helper class can be:

...

Broker Interface

Quota enforcement in the broker is done by the QuotaManager. check() is expected to be called before processing a request. If check() returns false, the broker is expected to take some quota action; otherwise the request is processed as usual.

Code Block
/**
 * This is the basic interface we need to implement in the Kafka broker. This class will record all events processed by the broker. 
 */
public interface QuotaManager {
    /** 
     * The function check should be called before processing any request. If this returns false, then some Quota action has to be taken.
     */
    <T extends RequestOrResponse> boolean check(T request);

    /**
     * This function is called after any client request has been processed and lets the implementation track information about the response. 
     * This is useful for consumer side quotas where we may not know the response sizes and number of messages before processing the request entirely.
     *
     * T : T can be any type of request or response. Typically (FetchRequest or ProducerRequest)
     * C : C is basically the response object. Typically (FetchResponse or ProducerResponse).
     *
     */
	<T extends RequestOrResponse, C extends RequestOrResponse> void onResponse(T request, C response);

	void shutdown();
}

One alternative (proposed by Jun) is to simply use the "fetchSize" parameter on the FetchRequest (per partition) when calling check() to determine if the request is violating the quota or not. This removes the need to have the onResponse method. 

Example usage in KafkaApis:

Code Block
// Example for consumers
def handleFetchRequest() {
  val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
  if(!quotaManager.check(fetchRequest))
    // take quota actions

  // Notify the quota manager after the request has been processed
  def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
      val fetchResponse = FetchResponse(fetchRequest.correlationId, responsePartitionData)
      quotaManager.onResponse(fetchRequest, fetchResponse);
  }
}

// Example for producers
def handleProducerRequest(request: RequestChannel.Request) {
  val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]

  if(!quotaManager .check(fetchRequest)) {
    // take quota actions after emptying the request buffer
    produceRequest.emptyData();
  }

  // the callback for sending a produce response
  def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
    val response = ProducerResponse(produceRequest.correlationId, responseStatus)
    quotaManager.onResponse(produceRequest, response);
  }
}

Quota Actions

What do we do when check() returns false? We have a few options that can pick from. Any opinions?

A) Immediately return an error: This is the simplest possible option. However, this requires clients to implement some sort of a back off mechanism since retrying immediately will likely make things worse.

B) Delay the request and return error: If a client is currently exceeding its quota, we can park the request in purgatory for 'n' milliseconds. After the request expires, the broker can return an error to the client. This is a nice feature because it effectively throttles the client up to the request timeout and makes it less critical for them to implement backoff logic. After the error is caught by the client, they can retry immediately. Note that in case of producers, we should be careful about retaining references to large Message bodies because we could easily exhaust broker memory if parking hundreds of requests. The downside of this approach is that it requires the clients to catch quota violation errors and retry their requests.

C) Delay the response but don't return an error: In this alternative (proposed by Jay), no error is returned to the client. Produce requests will get appended to the log immediately and will be kept in purgatory until this client is no longer throttled, after which the producer returns successfully. Fetch requests will also be deposited into the purgatory and serviced only after that client is no longer violating quota. Ideally, this approach requires an idempotent producer because it can lead to duplicate messages if the client disconnects and retries because the data has already been appended to the log. For example: if the client socket timeout is reached, it will disconnect, reestablish a connection and retry.

Our recommendation is that we choose either B or C.


Tooling/Monitoring Changes

Along with this feature, we are proposing to add the following tools/admin operations:

  • Dynamically disable/enable quota enforcement for the entire cluster. Such a feature is very useful while rolling out this feature to production environments.
  • Ability to disable quotas on a per-client basis. For example: we may not want to throttle mirror makers.
  • Dynamically change quotas for any client id.


We also plan to expose the fraction of quota used on a per-client basis via JMX (0-100%, where 100 means throttled).

Custom Implementations

...

B) If we instead model the quotas on a per-topic basis, provisioned quota can be split equally among all the partitions of a topic. For example: If a topic T has 8 partitions and total configured write throughput of 8MBps, each partition gets 1Mbps. If a broker hosts 3 leader partitions for T, then that topic is allowed 3MBps on that broker regardless of the partitions that traffic is directed to. In terms of quota distribution, this is the most elegant model.

 

Quota Actions

A) Immediately return an error: This is the simplest possible option. However, this requires clients to implement some sort of a back off mechanism since retrying immediately will likely make things worse.

 

 

B) Delay the request and return error: If a client is currently exceeding its quota, we can park the request in purgatory for 'n' milliseconds. After the request expires, the broker can return an error to the client. This is a nice feature because it effectively throttles the client up to the request timeout and makes it less critical for them to implement backoff logic. After the error is caught by the client, they can retry immediately. Note that in case of producers, we should be careful about retaining references to large Message bodies because we could easily exhaust broker memory if parking hundreds of requests. The downside of this approach is that it requires the clients to catch quota violation errors and retry their requests.

We have decided against (A) because it relies on clients to correctly implement back off behavior. A badly behaved client can simply retry in a loop and cause network saturation. In case of (B), clients have to handle an additional error code for quota and also build retry logic. In addition, there is a possibility of producer data loss if the request does not succeed after a certain number of retries.

There is more context of the email thread on this particular issue.