Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Name: client.quota.callback.class
  • Type: CLASS
  • Mode: Dynamically configurable as cluster-default for all brokers in the cluster
  • Description: The fully qualified name of a class that implements the ClientQuotaCallback interface, which is used to determine quota limits applied to client requests. By default, <user, client-id>, <user> or <client-id> quotas stored in ZooKeeper are applied. For any given request, the most specific quota that matches the user principal of the session and the client-id of the request is enforced by every broker.

...

The following new public classes/traits will be introduced in the package org.apache.kafka.server.quota (in the Kafka core clients project).

The quota types supported for the callback will be FetchFETCH/ProducePRODUCE/RequestREQUEST.

Code Block
languagescalajava
titleQuota types
objectpublic enum ClientQuotaType  {
  case object Fetch extends ClientQuotaType
  case object Produce extends ClientQuotaType PRODUCE,
    FETCH,
  case object Request extends ClientQuotaTypeREQUEST
}
sealed trait ClientQuotaType

 

 ClientQuotaCallback must be implemented by custom callbacks. It will also be implemented by the default quota callback.  Callback implementations should cache persisted configs if necessary to determine quotas quickly since ClientQuotaCallback.quota() will be invoked on every request.

Code Block
languagescalajava
titleClient Quota Callback
trait /**
 * Quota callback interface for brokers that enables customization of client quota computation.
 */
public interface ClientQuotaCallback extends Configurable {

    /**
     * Quota callback invoked to determine the quota limit to be applied for a request.
     *
 
    * @param sessionprincipal The session user principal of the connection for which quota is requested
     * @param clientId  The client id associated with the request
     * @param quotaType Type of quota requested
    *                  
    * @return the quota including the limit and metric tags that indicate which other entitiesclients share this quota
     */
  def  ClientQuota quota(session:KafkaPrincipal Sessionprincipal, String clientId:, String,ClientQuotaType quotaType: ClientQuotaType): ClientQuota);

    /**
     * Returns the quota limit associated with the provided metric tags. These tags were returned from
     * a previous call to [[ClientQuotaCallback.quota()]]{@link #quota(KafkaPrincipal, String, ClientQuotaType)}. This method is invoked
     * by quota managers to
   * obtain the current quota limit applied to a metric after a quota update or partition
     * cluster metadata change.
   * If the tags are no longer in use after the update, (e.g. this is a
     * {user, client-id)} quota metric
   * and the quota now in use is a ({user)} quota), Nonenull is returned.
     *
     * @param metricTags Metric tags for a quota metric of type `quotaType`
     * @param quotaType  Type of quota requested
     * @return the quota limit for the provided metric tags or Nonenull if the metric tags are no longer in use
     */
    defDouble quotaLimit(metricTags: Map[String, String], quotaType: ClientQuotaType): Option[Double]

  Map<String, String> metricTags, ClientQuotaType quotaType);

    /**
     * Metadata update callback that is invoked whenever UpdateMetadata request is received from
     * the controller. This is useful if quota computation takes partitions into account.
      * TheTopics fullthat listare ofbeing partitionsdeleted inwill thenot clusterbe willincluded bein provided`cluster`.
 including those whose leader is*
     * this@param broker.cluster TheCluster metadata forincluding eachpartitions partitionand willtheir includeleaders theif currentknown
 leader information.
    * Deleted@return partitionstrue willif notquotas behave changed includedand inmetric `partitions`.configs 
may need to be *updated
    * @param partitions*/
 All partitions and theirboolean metadata including partition leader updateClusterMetadata(Cluster cluster);

    /**
 @return true if quotas have* changed
Quota configuration update  */
  def updatePartitionMetadata(partitions: Map[TopicPartition, PartitionMetadata]): Boolean

  /**
callback that is invoked when quota configuration for an entity is
     * Quotaupdated configurationin updateZooKeeper. callbackThis that is invokeduseful wheneverto quotatrack configurationconfigured inquotas ZooKeeper
    * is updated. This is useful to track configured quotas if the builtif built-in quota configuration
 tools
    * tools are used for quota management.
     *
 
    * @param quotaEntity The quota entity for which quota is being updated.
     * @param quotaType   Type of quota being updated.
     * @param newValue    The new quota value.
 If None, the quota configuration* @return fortrue `quotaEntity`if isquotas deleted.
have changed and  *metric @returnconfigs truemay ifneed quotasto havebe changedupdated
     */
    defboolean updateQuota(ClientQuotaEntity quotaEntity:, ClientQuotaEntity,ClientQuotaType quotaType:, ClientQuotaType,double newValue: Option[Double]) : Boolean);

    /**
     * Closes this instance.
 Quota configuration removal callback that is invoked when quota configuration for an entity is
     */
  def close(): Unit
}

 

The quota returned by the callback should include the quota limit as well the metric tags to be used. These tags determine which entities share the quota.

By default the tags "user" and "client-id" will be used for all quota metrics. When <user, client-id> quota config is used, user tag is set to user principal of the session and client-id tag is set to the client-id of the request. If <user> quota config is used, user tag is set to user principal of the session and client-id tag is set to empty string. Similarly, if <client-id> quota config is used, the user tag is set to empty string. This ensures that the same quota sensors and metrics are shared by all requests that match each quota config.

Code Block
languagescala
titleClientQuota
/**
  * Client quota returned by `ClientQuotaCallback`.
  *
  * @param quotaLimit The quota bound to be applied
  * @param metricTags The tags to be added to the quota metric for this request. All entities
  *                   which have the same `metricTags` share the `quotaLimit`
  */
case class ClientQuota(quotaLimit: Double, metricTags: Map[String, String])

 

When quota configuration is updated in ZooKeeper, quota callbacks are notified of configuration changes. Quota configuration entities can be combined to define quotas at different levels.

Code Block
languagescala
titleClientQuotaEntity
object QuotaConfigEntityType  {
  case object User extends QuotaConfigEntityType
  case object ClientId extends QuotaConfigEntityType
  case object DefaultUser extends QuotaConfigEntityType
  case object DefaultClientId extends QuotaConfigEntityType
}
sealed trait QuotaConfigEntityType

trait QuotaConfigEntity {
  def name: String
  def entityType: QuotaConfigEntityType
}

/**
  * The metadata for an entity for which quota is configured. Quotas may be defined at
  * different levels and `configEntities` gives the config entities that define the level.
  * For example, if quota is configured for <userA, clientB>, `configEntities` will be
  * List(userA, clientB). For <clientC> quota, 'configEntities` will be List(clientC).
  */
trait ClientQuotaEntity {
  def configEntities: List[QuotaConfigEntity]
}

 

When partition leaders change, controller notifies brokers using UpdateMetadata request. Quota callbacks are notified of metadata changes so that callbacks that base quota computation on partitions have access to the current metadata.

Code Block
languagescala
titlePartition Metadata
/**
  * Partition metadata that may be used in quota computation. This is provided
  * by the broker when UpdateMetadata request is received from the controller.
  */
trait PartitionMetadata {
  def leader: Option[Int]
}
 removed in ZooKeeper. This is useful to track configured quotas if built-in quota configuration
     * tools are used for quota management.
     *
     * @param quotaEntity The quota entity for which quota is being updated.
     * @param quotaType   Type of quota being updated.
     * @return true if quotas have changed and metric configs may need to be updated
     */
    boolean removeQuota(ClientQuotaEntity quotaEntity, ClientQuotaType quotaType);

    /**
     * Closes this instance.
     */
    void close();
}

 

The quota returned by the callback should include the quota limit as well the metric tags to be used. These tags determine which entities share the quota.

By default the tags "user" and "client-id" will be used for all quota metrics. When <user, client-id> quota config is used, user tag is set to user principal of the session and client-id tag is set to the client-id of the request. If <user> quota config is used, user tag is set to user principal of the session and client-id tag is set to empty string. Similarly, if <client-id> quota config is used, the user tag is set to empty string. This ensures that the same quota sensors and metrics are shared by all requests that match each quota config.

Code Block
languagejava
titleClientQuota
/**
 * Client quota returned by {@link ClientQuotaCallback} that includes the quota bound
 * as well as the metric tags that indicate which other clients share this quota.
 */
public class ClientQuota {
    private final double quotaLimit;
    private final Map<String, String> metricTags;

    /**
     * Constructs an instance of ClientQuota.
     *
     * @param quotaLimit The quota bound to be applied
     * @param metricTags The tags to be added to the quota metric for this request. All
     *                   entities which have the same `metricTags` share the `quotaLimit`
     */
    public ClientQuota(double quotaLimit, Map<String, String> metricTags) {
        this.quotaLimit = quotaLimit;
        this.metricTags = metricTags;
    }

    /**
     * Returns the quota bound.
     */
    public double quotaLimit() {
        return quotaLimit;
    }

    /**
     * Returns the tags to be added to the quota metric for this request. All
     * entities which have the same `metricTags` share a quota.
     */
    public Map<String, String> metricTags() {
        return metricTags;
    }
}

 

When quota configuration is updated in ZooKeeper, quota callbacks are notified of configuration changes. Quota configuration entities can be combined to define quotas at different levels.

Code Block
languagejava
titleClientQuotaEntity
/**
 * The metadata for an entity for which quota is configured. Quotas may be defined at
 * different levels and `configEntities` gives the list of config entities that define
 * the level of this quota entity.
 */
public interface ClientQuotaEntity {
    /**
     * Entity type of a {@link ConfigEntity}
     */
    public enum ConfigEntityType {
        USER,
        CLIENT_ID,
        DEFAULT_USER,
        DEFAULT_CLIENT_ID
    }

    /**
     * Interface representing a quota configuration entity. Quota may be
     * configured at levels that include one or more configuration entities.
     * For example, {user, client-id} quota is represented using two
     * instances of ConfigEntity with entity types USER and CLIENT_ID.
     */
    public interface ConfigEntity {
        /**
         * Returns the name of this entity. For default quotas, an empty string is returned.
         */
        String name();

        /**
         * Returns the type of this entity.
         */
        ConfigEntityType entityType();
    }

    /**
     * Returns the list of configuration entities that this quota entity is comprised of.
     * For {user} or {clientId} quota, this is a single entity and for {user, clientId}
     * quota, this is a list of two entities.
     */
    List<ConfigEntity> configEntities();
}

 

When partition leaders change, controller notifies brokers using UpdateMetadata request. Quota callbacks are notified of metadata changes so that callbacks that base quota computation on partitions have access to the current metadata. The existing public interface org.apache.kafka.common.Cluster will be used for metadata change notification. 

Proposed Changes

ClientQuotaManager and ClientRequestQuotaManager will be updated to move quota configuration management into a new class DefaultQuotaCallback  that implements ClientQuotaCallback. If a custom callback is not configured, DefaultQuotaCallback will be used.

If a custom callback is configured, it will be instantiated when the broker is started. DynamicBrokerConfig will be updated to handle changes to the callback. KafkaApis will invoke quotaCallback.updatePartitionMetadataupdateClusterMetadata when UpdateMetadata request is received from the controller. This will be ignored by the default quota callback. When ConfigHandler invokes ClientQuotaManager.updateQuota to process dynamic quota config updates, quotaCallback.updateQuota will be invoked. The existing logic to process quota updates will be moved to the default quota callback.

...

We could implement different quota algorithms in Kafka and support quota groups, partition-based quotas etc. But this would require Kafka to manage these groups, mapping of users to partitions etc, increasing the complexity of the code. Since it will be hard to include support for all possible scenarios into the broker code, it will be simpler to make quota computation configurable. This also enables the computation to be altered dynamically without restarting the broker since the broker since the new option will be a dynamic broker config.

Enable management of client quotas and replication quotas using a single callback interface

The configuration and management of replication quotas are completely separate from client quota management in the broker. Since the configuration entities are different, it will be simpler to keep them separate. It is not clear if there are scenarios that require custom replication quotas, so this KIP only addresses client quotas.

new option will be a dynamic broker config.

Enable management of client quotas and replication quotas using a single callback interface

The configuration and management of replication quotas are completely separate from client quota management in the broker. Since the configuration entities are different, it will be simpler to keep them separate. It is not clear if there are scenarios that require custom replication quotas, so this KIP only addresses client quotas.

Use Scala traits for public interfaces similar to Authorizer

For compatibility reasons, we are now using Java rather than Scala for all pluggable interfaces including those on the broker. There is already a KIP to move Authorizer to Java as well. As we will be removing support for Java 7 in the next release, we can also use default methods in Java when we need to update pluggable Java interfaces. So the plan is to use Java for all new pluggable interfaces.