Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

WIP - Not ready to review

Status

Current stateUnder Discussion

...

Configuration Management

How do we manage the quota overrides and the default topic configs? Manually configuring brokers with these is painful. In this case, the ability to dynamically change configs without bouncing brokers is very useful. Is it sufficient to use the topic level configs in Zookeeper? If they are not sufficient, then there There is already a proposal/patch for dynamic configuration management by Joe Stein which we plan to leverage for distributing these quota configs. In the future, we also need to think about restricting access to these configs (so that customers cannot modify their own quotas) but that is a separate discussion.

Broker Interface

Quota enforcement in the broker is done by the QuotaManager. check() is expected to be called before processing a request. If check() returns false, the broker is expected to take some quota action; otherwise the request is processed as usual.

Code Block
/**
 * This is the basic interface we need to implement in the Kafka broker. This class will record all events processed by the broker. 
 */
public interface QuotaManager {
    /** 
     * The function check should be called before processing any request. If this returns false, then some Quota action has to be taken.
     */
    <T extends RequestOrResponse> boolean check(T request);

    /**
     * This function is called after any client request has been processed and lets the implementation track information about the response. 
     * This is useful for consumer side quotas where we may not know the response sizes and number of messages before processing the request entirely.
     *
     * T : T can be any type of request or response. Typically (FetchRequest or ProducerRequest)
     * C : C is basically the response object. Typically (FetchResponse or ProducerResponse).
     *
     */
	<T extends RequestOrResponse, C extends RequestOrResponse> void onResponse(T request, C response);

	void shutdown();
}

One alternative (proposed by Jun) is to simply use the "fetchSize" parameter on the FetchRequest (per partition) when calling check() to determine if the request is violating the quota or not. This removes the need to have the onResponse method.

However, fetchSize is the maximum amount of data that can be returned for that partition and may not accurately reflect the actual response size which IMO is important. For example: if the fetchSize is set to Integer.MAX_VALUE, every request will get throttled immediately.

Example usage in KafkaApis:

Code Block
// Example for consumers
def handleFetchRequest() {
  val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
  if(!quotaManager.check(fetchRequest))
    // take quota actions

  // Notify the quota manager after the request has been processed
  def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
      val fetchResponse = FetchResponse(fetchRequest.correlationId, responsePartitionData)
     quotaManager.notify(fetchRequest, fetchResponse);
  }
}

Quota Actions

Custom Implementations

The interface has been kept very generic to allow multiple implementations of the quota policy. The default policy that we have proposed should suffice for the majority of use cases. Since we pass in the actual request and response objects to the QuotaManager, these implementations should have enough information to build complex policies if required.

Compatibility, Deprecation, and Migration Plan

...