Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

How do the Kafka Server react when it detects a quota violation? In our proposal, the broker does not return an error rather it attempts to slow down the a client exceeding it's quota. The behavior is subtly different for producer and consumer.

...

Here's how the code might look inside the Broker (in KafkaApi's KafkaApis, ReplicaManager or helper class). This is just for illustration purposes.

Code Block
 

...

def appendMessages(...) {
  val metric : Sensor = metrics.get(sensorName)
 try {
    metric.record(produceSize)
  } catch (QuotaViolationException ex) {
    val delayedProduce = new DelayedProduce(ex.getDelayTime(), ....)
    delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
  } 
}
 
def fetchMessages(...) {
  val metric : Sensor = metrics.get(sensorName)
  try {
    metric.record(fetchSize)
  } catch (QuotaViolationException ex) {
    
    val delayedFetch = new DelayedFetch(ex.getDelayTime(), ....)
    delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, fetchRequestKeys)
  }  
}

Metrics

Kafka server has decided to migrate it's metrics to Kafka Metrics (KM). For more details read this. Kafka metrics maintains a Sensor for each measured metric. This can be configured with a "Quota" which is a bound of the min and max value of a metric. If recording a value causes a bound to get exceeded, a QuotaViolationException is thrown.

...

Code Block
Delay Time = (overall produced in window - quotabound)/Quota limit per second 
 
In the example above, we will delay for 2 seconds. 
Delay Time = (60 - 50)/5 = 2 second delay.

 

The code in KafkaApis or helper class can be:

 

...

.

...

 

 

Configuration Management

How do we manage the quota overrides and the default topic configs? Manually configuring brokers with these is painful. In this case, the ability to dynamically change configs without bouncing brokers is very useful. There is already a proposal/patch for dynamic configuration management by Joe Stein which we plan to leverage for distributing these quota configs. In the future, we also need to think about restricting access to these configs (so that customers cannot modify their own quotas) but that is a separate discussion.

Broker Interface

Quota enforcement in the broker is done by the QuotaManager. check() is expected to be called before processing a request. If check() returns false, the broker is expected to take some quota action; otherwise the request is processed as usual.

Code Block
/**
 * This is the basic interface we need to implement in the Kafka broker. This class will record all events processed by the broker. 
 */
public interface QuotaManager {
    /** 
     * The function check should be called before processing any request. If this returns false, then some Quota action has to be taken.
     */
    <T extends RequestOrResponse> boolean check(T request);

    /**
     * This function is called after any client request has been processed and lets the implementation track information about the response. 
     * This is useful for consumer side quotas where we may not know the response sizes and number of messages before processing the request entirely.
     *
     * T : T can be any type of request or response. Typically (FetchRequest or ProducerRequest)
     * C : C is basically the response object. Typically (FetchResponse or ProducerResponse).
     *
     */
	<T extends RequestOrResponse, C extends RequestOrResponse> void onResponse(T request, C response);

	void shutdown();
}

One alternative (proposed by Jun) is to simply use the "fetchSize" parameter on the FetchRequest (per partition) when calling check() to determine if the request is violating the quota or not. This removes the need to have the onResponse method. 

Example usage in KafkaApis:

Code Block
// Example for consumers
def handleFetchRequest() {
  val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
  if(!quotaManager.check(fetchRequest))
    // take quota actions

  // Notify the quota manager after the request has been processed
  def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
      val fetchResponse = FetchResponse(fetchRequest.correlationId, responsePartitionData)
      quotaManager.onResponse(fetchRequest, fetchResponse);
  }
}

// Example for producers
def handleProducerRequest(request: RequestChannel.Request) {
  val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]

  if(!quotaManager .check(fetchRequest)) {
    // take quota actions after emptying the request buffer
    produceRequest.emptyData();
  }

  // the callback for sending a produce response
  def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
    val response = ProducerResponse(produceRequest.correlationId, responseStatus)
    quotaManager.onResponse(produceRequest, response);
  }
}

Tooling/Monitoring Changes

Along with this feature, we are proposing to add the following tools/admin operations:

  • Dynamically disable/enable quota enforcement for the entire cluster. Such a feature is very useful while rolling out this feature to production environments.
  • Ability to disable quotas on a per-client basis. For example: we may not want to throttle mirror makers.
  • Dynamically change quotas for any client id.


We also plan to expose the fraction of quota used on a per-client basis via JMX (0-100%, where 100 means throttled).

Custom Implementations

...

.

Compatibility, Deprecation, and Migration Plan

Rejected Alternatives

Topic Based QuotasWe initially considered doing topic based quotas. This was rejected in favor or client based quotas since it seemed more natural to throttle clients than topics. Multiple producers/consumers can publish/fetch from a single topic. This makes it hard to eliminate interference of badly behaved clients. Topic based quotas also are harder to reason about when clients are accessing multiple topics (e.g. wildcard consumer). If quota for 1 topic is violated, how do we handle requests for the other topics?

...