Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Kafka brokers currently group clients based on user principal and/or client-id for quota enforcement. If quotas are configured at <user, client-id> level, all requests that share the user principal and client-id will share the quota. If quotas are configured at <user> level, all requests that share the user principal share that but don't have a matching <user, client-id> quota configuration share the <user> quota (and similarly for <client-id> quotas). In some scenarios, it is useful to define a quota group that combines multiple user principals and/or client-ids. All the requests from the group may then share a single quota.
  2. Some clients may have access only to a few topics which are hosted on a subset of brokers. The load from these clients will be mostly on the subset of brokers that are leaders of that subset of topic partitions. Rather than allocate a fixed quota for these clients on each broker, it will be useful to have quotas that are proportional to the number of partitions used by the client that are hosted on the broker. Since partition leaders may change dynamically, it will be better to compute quotas at runtime rather than update ZooKeeper with new quotas whenever partition leaders change.

...

  • Name: client.quota.callback
  • Type: CLASS
  • Mode: Dynamically configurable as cluster-default for all brokers in the cluster
  • Description: The fully qualified name of a class that implements the ClientQuotaCallback interface, which is used to determine quota limits applied to client requests. By default, <user, client-id>, <user> or <client-id> quotas stored in ZooKeeper are applied. For any given request, the most specific quota that matches the user principal of the session and the client-id of the request is enforced by every broker.

New Interfaces

...

The following new public classes/traits will be introduced in the package kafka.quota (in the Kafka core project).

The quota types supported for the callback will be Fetch/Produce/Request.

 

 

Code Block
languagescala
titleQuota types
object ClientQuotaType  {
  case object Fetch extends ClientQuotaType
  case object Produce extends ClientQuotaType
  case object Request extends ClientQuotaType
}
sealed trait ClientQuotaType

...

 ClientQuotaCallback must be implemented by custom callbacks. It will also be implemented by the default quota callback.  Callback implementations should cache persisted configs if necessary to determine quotas quickly since ClientQuotaCallback.quota() will be invoked on every request.

Code Block
languagescala
titleClient Quota Callback
trait ClientQuotaCallback extends Configurable {

  /**
    * Quota callback invoked to determine the quota limit to be applied for a request.
    * 
    * @param session The session for which quota is requested
    * @param clientId The client id associated with the request
    * @param quotaType Type of quota requested
    *                  
    * @return the quota including the limit and metric tags that indicate which other entities share this quota
    */
  def quota(session: Session, clientId: String, quotaType: ClientQuotaType): ClientQuota

  /**
    * Metadata update callback that is invoked whenever UpdateMetadata request is received from
    * the controller. This is useful if quota computation is takes partitions into account.
    * 
    * @param partitions Partitions and their metadata including partition leader
    */
  def updatePartitionMetadata(partitions: Map[TopicPartition, PartitionMetadata]): Unit

  /**
    * Quota configuration update callback that is invoked whenever quota configuration in ZooKeeper
    * is updated. This is useful to track configured quotas if the built-in quota configuration tools
    * are used.
    * 
    * @param quotaEntity The quota entity for which quota is being updated.
    * @param quotaType Type of quota being updated.
    * @param newValue The new quota value. If None, the quota configuration for `quotaEntity` is deleted.
    */
  def updateQuota(quotaEntity: ClientQuotaEntity, quotaType: ClientQuotaType, newValue: Option[Double]) : Unit

  /**
    * Closes this instance.
    */
  def close(): Unit
}

...

By default the tags "user" and "client-id" will be used for all quota metrics. When <user, client-id> quota config is used, user tag is set to user principal of the session and client-id tag is set to the client-id of the request. If <user> quota config is used, user tag is set to user principal of the session and client-id tag is set to empty string. Similarly, if <client-id> quota config is used, the user tag is set to empty string. This ensures that quotas the same quota sensors and metrics are shared by all requests that match the each quota config.

Code Block
languagescala
titleClientQuota
/**
  * Client quota returned by `ClientQuotaCallback`.
  *
  * @param quotaLimit The quota bound to be applied
  * @param metricTags The tags to be added to the quota metric for this request. All entities
  *                   which sharehave the same `metricTags` share the `quotaLimit`
  */
case class ClientQuota(quotaLimit: Double, metricTags: Map[String, String])

...

ClientQuotaManager and ClientRequestQuotaManager will be updated to move quota configuration management into a new class DefaultQuotaCallback class   that implements ClientQuotaCallback. If a custom callback is not configured, DefaultQuotaCallback will be used.

...

We could implement different quota algorithms in Kafka and support quota groups, partition-based quotas etc. But this would require Kafka to manage these groups, mapping of users to partitions etc, increasing the complexity of the code. Since it will be hard to include support for all possible scenarios into the broker code, it will be simpler to make quota computation configurable. This also enables the computation to be altered dynamically without restarting the broker since the new option will be a dynamic broker config.

...

The configuration and management of replication quotas are completely separate from client quota management in the broker. Since the configuration entities are different, it will be simpler to keep them separate. It is not clear if there are scenarios that require custom replication quotas, so this KIP only addresses client quotas.

...