Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Broker Interface

Quota enforcement in the broker is done by the QuotaManager. check() is expected to be called before processing a request. If check() returns false, the broker is expected to take some quota action; otherwise the request is processed as usual.

Code Block
/**
 * This is the basic interface we need to implement in the Kafka broker. This class will record all events processed by the broker. 
 */
public interface QuotaManager {
    /** 
     * The function check should be called before processing any request. If this returns false, then some Quota action has to be taken.
     */
    <T extends RequestOrResponse> boolean check(T request);

    /**
     * This function is called after any client request has been processed and lets the implementation track information about the response. 
     * This is useful for consumer side quotas where we may not know the response sizes and number of messages before processing the request entirely.
     *
     * T : T can be any type of request or response. Typically (FetchRequest or ProducerRequest)
     * C : C is basically the response object. Typically (FetchResponse or ProducerResponse).
     *
     */
	<T extends RequestOrResponse, C extends RequestOrResponse> void onResponse(T request, C response);

	void shutdown();
}

One alternative (proposed by Jun) is to simply use the "fetchSize" parameter on the FetchRequest (per partition) when calling check() to determine if the request is violating the quota or not. This removes the need to have the onResponse method. However, fetchSize is the maximum amount of data that can be returned per-partition within a request and may not accurately reflect the actual response size which IMO is important. For example: if the fetchSize is set to Integer.MAX_VALUE, every request will get throttled immediately which may not be the behavior we want. 

Example usage in KafkaApis:

Code Block
// Example for consumers
def handleFetchRequest() {
  val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
  if(!quotaManager.check(fetchRequest))
    // take quota actions

  // Notify the quota manager after the request has been processed
  def sendResponseCallback(responsePartitionData: Map[TopicAndPartition, FetchResponsePartitionData]) {
      val fetchResponse = FetchResponse(fetchRequest.correlationId, responsePartitionData)
     quotaManager.notify(fetchRequest, fetchResponse);
  }
}

// Example for producers

  def handleProducerRequest(request: RequestChannel.Request) {
    val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]

  if(!quotaManager .check(fetchRequest))
    // take quota actions

    // the callback for sending a produce response
    def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
      var errorInResponse = false
      responseStatus.foreach { case (topicAndPartition, status) =>
        // we only print warnings for known errors here; if it is unknown, it will cause
        // an error message in the replica manager
        if (status.error != ErrorMapping.NoError && status.error != ErrorMapping.UnknownCode) {
          debug("Produce request with correlation id %d from client %s on partition %s failed due to %s"
            .format(produceRequest.correlationId, produceRequest.clientId,
            topicAndPartition, ErrorMapping.exceptionNameFor(status.error)))
          errorInResponse = true
        }
      }
      if (produceRequest.requiredAcks == 0) {
        // no operation needed if producer request.required.acks = 0; however, if there is any error in handling
        // the request, since no response is expected by the producer, the server will close socket server so that
        // the producer client will know that some error has happened and will refresh its metadata
        if (errorInResponse) {
          info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0"
                  .format(produceRequest.correlationId, produceRequest.clientId))
          requestChannel.closeConnection(request.processor, request)
        } else {
          requestChannel.noOperation(request.processor, request)
        }
      } else {
        val response = ProducerResponse(produceRequest.correlationId, responseStatus)
        requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
      }
    }
    // only allow appending to internal topic partitions
    // if the client is not from admin
    val internalTopicsAllowed = produceRequest.clientId == AdminUtils.AdminClientId
    // call the replica manager to append messages to the replicas
    replicaManager.appendMessages(
      produceRequest.ackTimeoutMs.toLong,
      produceRequest.requiredAcks,
      internalTopicsAllowed,
      produceRequest.data,
      sendResponseCallback)
    // if the request is put into the purgatory, it will have a held reference
    // and hence cannot be garbage collected; hence we clear its data here in
    // order to let GC re-claim its memory since it is already appended to log
    produceRequest.emptyData()
  }

Quota Actions

What do we do when check() returns false? We have a few options that can pick from. Any opinions?

A) Immediately return an error: This is the simplest possible option. However, this requires clients to implement some sort of a back off mechanism since retrying immediately will likely make things worse.

B) Delay the request and return error: If a client is currently exceeding its quota, we can park the request in purgatory for 'n' milliseconds. After the request expires, the broker can return an error to the client. This is a nice feature because it effectively throttles the client up to the request timeout and makes it less critical for them to implement backoff logic. After the error is caught by the client, they can retry immediately. Note that in case of producers, we should be careful about retaining references to large Message bodies because we could easily exhaust broker memory if parking hundreds of requests. The downside of this approach is that it requires the clients to catch quota violation errors and retry their requests.

C) Delay the response but don't return an error: In this alternative (proposed by Jay), no error is returned to the client. Produce requests will get appended to the log immediately and will be kept in purgatory until this client is no longer throttled, after which the producer returns successfully. Fetch requests will also be deposited into the purgatory and serviced only after that client is no longer violating quota. Ideally, this approach requires an idempotent producer because it can lead to duplicate messages if the client disconnects and retries because the data has already been appended to the log. For example: if the client socket timeout is reached, it will disconnect, reestablish a connection and retry.

Our recommendation is that we choose either B or C.

Tooling/Monitoring Changes

Along with this feature, we are proposing to add the following tools/admin operations:

  • Dynamically disable/enable quota enforcement for the entire cluster. Such a feature is very useful while rolling out this feature to production environments.
  • Ability to disable quotas on a per-client basis. For example: we may not want to throttle mirror makers.
  • Dynamically change quotas for any client id.


We also plan to expose the fraction of quota used on a per-client basis via JMX (0-100%, where 100 means throttled).

Custom Implementations

...