Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  DraftAccepted - 2.6.0 contains describe and alter functionality, resolve is pending for a future release.

Discussion thread: TODO here

JIRA: TODO KAFKA-7740

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Quota management via Admin Client has gone through a couple drafts of proposals (KIP-248, KIP-422). While improvements have been made to the Admin interface for configuration handling, fitting quotas into the API is awkward as they don't fit the natural key-value pairing, nor is the configuration output expressive enough to return all useful information. Therefore, it'd be beneficial to have a quota-native API for managing quotas, which would offer an intuitive and less error-prone interface, convey additional information beyond what the configuration APIs provide, and enable for future extensibility as quotas types are added or evolved.

Background

Quotas By default, quotas are currently defined in terms of a user and client ID, where the user acts as a specific an opaque principal name, and the client ID as a more generic group identifier. When setting quotas, an administrator has flexibility in how it specifies the user and client ID for which the quota applies to, where the user and client ID may be specifically named, indicated as the a default, or omitted entirely. Since there's multiple ways to specify quotas, a hierarchy structure for which quotas apply must be appliedquotas have flexible configurations, there is a method for resolving the quotas that apply to a request: a hierarchy structure is used, where the most specific , defined quota will be matched to a request's user and client ID.

As represented by the current ZK node structure, the order in which quotas are matched are as follows. Note , from highest priority to lowest (note <user> is a specified user principal, <client-id> is a specified client ID, and <default> is a special default user/client ID that matches to all users or clients IDs.):

        /config/users/<user>/clients/<client-id>
        /config/users/<user>/clients/<default>
        /config/users/<user>
        /config/users/<default>/clients/<client-id>
        /config/users/<default>/clients/<default>
        /config/users/<default>
        /config/clients/<client-id>
        /config/clients/<default>

As such, reasoning around quotas can be complex, as it's not immediately obvious which quotas may apply to a given user and/or client ID. Providing descriptive information about how quotas are matched is a the first goal of this KIP. Likewise, retrieving and modifying quota values can be done in a more expressive and robust way, which is the second goal of the KIP.

APIs

In order to clearly specify the APIs, let's first disambiguate some terminology: Every client request that is processed is associated with a quota entity, Note: Every client request that is processed is associated with a quota entity, which is a map of entity types to their corresponding entity names for the request. Using the current entity types in the previous examples, an entity may be is of the form {user=test-user, clientId client-id=test-client}. When , where user and client-id are the types, and test-user and test-client are the names. However, when specifying a quota configuration entry, only a subset of the entity types need to be provided, which is referred to as an entity match, for example, {user=<default>}.

APIs

For describing quotas, there's two modes of operation that are necessary for administration: config-centric, and entity-centric.

  1. The config-centric mode describes what is exactly specified for the configuration for the given entity match. However, it would also be useful to also be able to determine which matches have configuration values defined, so the presence of a filter is used for gathering information about the entity matches that the administrator is interested in. This is DescribeQuotas DescribeClientQuotas.
  2. The entity-centric mode describes what quotas are applied apply to an entity. Note that an entity may match to various configuration entries depending on how the quotas are specified, e.g. the producer byte rate may be specified for the user, but the consumer byte rate for the client ID. Since upon querying this information, it may not be clear how quotas were matched for an entity , from the configuration, additional information should be returned to give provide more context. This is DescribeEffectiveQuotas ResolveClientQuotas.

Altering quotas only works on a config-centric manner, and therefore doesn't need distinguishing. For a given entity match, the administrator should be able to specify which quotas apply, or alternatively remove existing quotas so they don't no longer match. This is AlterQuotas AlterClientQuotas.

Units

This KIP introduces the concept of a quota unit to be applied to a quota value. Currently, only a single unit is used for quotas: bytes-per-second, however this has limitations to effective quota management. For example, since it's a global value, it doesn't scale well as brokers are added or removed, and so a broker-bytes-per-second unit could be added to better manage this behavior. As units are added, the possible quota configuration entries becomes the cross product of the quota types by the quota units, which means that it'd be possible to specify bytes-per-second both on a global and per-broker basis, and quota enforcement would occur at whichever limit was hit first. Additional considerations could be made for a fair-share system. Units of shares could be configured to entities, and when bandwidth is contested, the share count of the various entities could be used to determine their restricted throughput.

While it's beyond the scope of this KIP to implement such functionality in the broker, it must be noted for future extensibility of the APIs.

Types Rationale

While there's two defined entity types in AK, a server-side plugin mechanism allows for further expansion. Likewise, as use cases evolve, finer-grained control may be necessary. Therefore, entity types should not be statically bound to what's publicly defined, and instead the API should support flexible entity types by interpreting them as a String identifier. Any entity types that the broker doesn't understand should return an error back to the client.

The quota types (producer byte rate, etc.) and units should also be given the same consideration. The possible quota applications may expand in the future, and the API shouldn't lock in which quota types are accessible. Modification of quota types/units that are unknown should fail with error.

For forming entities, since a fixed set of entity types aren't defined, a Map<String, String> should be used to map entity type to entity name.

Public Interfaces

Admin client calls will be added to correspond to describeQuotas, describeEffectiveQuotas, and alterQuotas, with supporting types defined in the common.quotas package.

Common types in package org.apache.kafka.common.quota:

The quota values are of type Double, which presents a complication in that the RPC protocol doesn't support floating point values. To accommodate this, RPC protocol message type 'double' will be added, which will serialize doubles according to the IEEE 754 floating-point "double format" bit layout.

Types Rationale

While there's two defined entity types in AK, a server-side plugin mechanism allows for further expansion. Likewise, as use cases evolve, finer-grained quota control may be necessary. Therefore, entity types should not be statically bound to publicly defined constants, and instead the API should support flexible entity types by interpreting them as a String identifier. Any entity types that the broker doesn't understand should throw an IllegalArgumentException back to the client.

The quota types (producer byte rate, consumer byte rate, etc.) should also be given the same consideration. The possible quota applications may expand in the future, and the API shouldn't lock in which quota types are accessible. Modification of quota types that are unknown should also fail with error.

Since a fixed set of entity types aren't defined, an entity should be represented by a Map<String, String>, which maps an entity type to the entity name.

Public Interfaces

Admin client calls will be added to correspond to DescribeClientQuotas, ResolveClientQuotas, and AlterClientQuotas, with supporting types defined in the common.quotas package.

Common types in package org.apache.kafka.common.quota (2.6.0)

Code Block
languagejava
/**
 * Describes a client quota entity, which is a mapping of entity types to their names.
 */
public class ClientQuotaEntity {

Code Block
languagejava
/**
 * Describes a fully-qualified entity.
 */
public class QuotaEntity {
    /**
     * The Typetype of an entity entry.
     */
    public enumstatic Typefinal {
String USER       USER,= "user";
    public static final String CLIENT_ID;
    }= "client-id";

    /**
     * RepresentsConstructs thea defaultquota nameentity for the an entity, i.e. the entity that's matchedgiven types and names. If a name is null,
     * when an exact match isn't found then it is mapped to the built-in default entity name.
     */
    public * final@param staticentries String QUOTA_ENTITY_NAME_DEFAULT = // implementation defined

maps entity type to its name
     */**
    public * `entries` describes the fully-qualified entity. The key is a {@code Type} string, however
     * there may also exist keys that are not enumerated by {@code Type} that still apply, e.g.
     * the server may internally associate another type. It's necessary to return all quota types
     * because quota values for these types may influence the effective quota value. However,ClientQuotaEntity(Map<String, String> entries);

    /**
     * @return map of entity type to its name
     */
    public Map<String, String> entries();
}

/**
 * Describes a component for applying a client quota filter.
 */
public class ClientQuotaFilterComponent {

    /**
     * Constructs and returns a filter component that exactly matches the provided entity
     * name for the entity type.
     *
 altering a quota, any types* that@param aren'tentityType specifiedthe shouldentity betype ablethe tofilter becomponent inferredapplies by theto
     * server, otherwise an error is returned. @param entityName the entity name that's matched exactly
     */
    public * For example, {("CLIENT_ID" -> "test-client"),static ClientQuotaFilterComponent ofEntity(String entityType, String entityName);

     /**
     * Constructs and returns a filter component that matches  ("USER" -> "test-user"),the built-in default entity name
     * for the entity type.
     *
     * @param entityType the entity type the filter ("GROUP" -> "internal-group")}.component applies to
     */
    public QuotaEntity(Map<String, String> entriesstatic ClientQuotaFilterComponent ofDefaultEntity(String entityType);
}

    /**
     * DescribesConstructs and returns a quotafilter key.
 */
public class QuotaKey {
    /**component that matches any specified name for the
     * Theentity quota typestype.
     */
    public enum* Type@param {
entityType the entity type the filter component  CONSUMER_BYTE_RATE,applies to
        PRODUCER_BYTE_RATE,*/
    public static ClientQuotaFilterComponent  REQUEST_PERCENTAGE;
    }
ofEntityType(String entityType);

    /**
     * The units for a quota value. Note there may be multiple units for a given quota type@return the component's entity type
     */
    public String entityType();

    /**
     * that influences quota behavior.@return the optional match string, where:
     */
    public enum Units {
  if present, the name that's matched RATE_BPS;exactly
     }

*    /**
     *if @paramempty, typematches the quotadefault typename
     * @param units the units for the quota type
 if null, matches any specified name
     */
    public QuotaKey(Type type, Units unitsOptional<String> match();
}

/**
 * Describes a client quota entity filter.
 */
public class QuotaFilterClientQuotaFilter {

    public/**
 enum Rule {
  * A filter to be applied EXACT,to matching client quotas.
  // exact name match*
     * @param components PREFIX;the components to //filter matcheson
 all names with the given* prefix
@param strict whether the }

filter only includes specified /**components
     */
 A filtering rule toprivate be applied.ClientQuotaFilter(Collection<ClientQuotaFilterComponent> components, boolean strict);

     /**
     * @param entityType the entity type the rule applies toConstructs and returns a quota filter that matches all provided components. Matching entities
     * @param rule the rule to apply with entity types that are not specified by a component will also be included in the result.
     *
     * @param matchcomponents the non-null string that's applied by the rulecomponents for the filter
     */
    public QuotaFilter(QuotaEntity.Type entityType, Rule rule, String matchstatic ClientQuotaFilter contains(Collection<ClientQuotaFilterComponent> components);
}

DescribeQuotas:

Code Block
languagejava
public class DescribeQuotasOptions extends AbstractOptions<DescribeQuotasOptions> { /**
    // Empty.
}

/**
 *Constructs Theand resultreturns ofa thequota {@link Admin#describeQuotas(Collection<QuotaFilter>, DescribeQuotasOptions)} call.
 */
public class DescribeQuotasResult {

filter that matches all provided components. Matching entities
     /**
 with entity types that *are Mapsnot anspecified entityby toa itscomponent configuredwill quota value(s). Note if no value is defined for a quota*not* be included in the result.
     *
     * type @param components the components for the filter
 that entity's config, then it*/
 is not included inpublic thestatic resultingClientQuotaFilter value map.containsOnly(Collection<ClientQuotaFilterComponent> components);

     /**
     * @paramConstructs entitiesand thereturns collectiona ofquota entitiesfilter that matches matchedall theconfigured filterentities.
     */
    public DescribeQuotasResult(KafkaFuture<Map<QuotaEntity, Map<QuotaKey, Long>>> entitiesstatic ClientQuotaFilter all();

    /**
     * Returns@return athe map from quota entity to a future which can be used to check the status of the operation.filter's components
     */
    public Collection<ClientQuotaFilterComponent> components();

    /**
     */
 @return whether the publicfilter is KafkaFuture<Map<QuotaEntity, Map<QuotaKey, Long>>> entities();
}

public interface Admin extends AutoCloseable {strict, i.e. only includes specified components
     */
    ...

    public boolean strict();
}

/**
 * Describes a configuration *alteration Describesto allbe entitiesmade matchingto alla provided filters (logical AND) that have at least one
     * quota value defined.
client quota entity.
 */
public class ClientQuotaAlteration {

    public static class Op {

        /**
         * @param filterskey the filteringquota rulestype to alter
 apply to matching entities
     * @param options value if set then the existing optionsvalue tois use
updated,
         * @return result containing all matching entities
     */
   otherwise DescribeQuotasResult describeQuotas(Collection<QuotaFilter> filters, DescribeQuotasOptions options);
}

DescribeEffectiveQuotas:

Code Block
languagejava
public class DescribeEffectiveQuotasOptions extends AbstractOptions<DescribeEffectiveQuotasOptions> {

if null, the existing value is cleared
         */
        public Op(String key, Double value);

        /**
     * Whether to exclude the* list@return ofthe overriddenquota valuestype forto everyalter
 quota type in the result.
     */
        public DescribeEffectiveQuotasOptionsString setOmitOverriddenValueskey(boolean omitOverriddenValues);
}


        /**
 * The result of the {@link Admin#describeEffectiveQuotas(Collection<QuotaEntity>, DescribeEffectiveQuotasOptions)} call.
 */
public class DescribeEffectiveQuotasResult {
    /**
   * @return if set then the existing value is updated,
      * Information about a* specific quota configuration entry.
     */
otherwise if null, the publicexisting classvalue Entryis {cleared
         */**
        public * @param source the entity source for the valueDouble value();
    }

    private final ClientQuotaEntity entity;
  * @param valueprivate thefinal non-null valueCollection<Op> ops;

    /**
     * @param entity the entity whose config will be modified
     */ @param ops the alteration to perform
     */
    public EntryClientQuotaAlteration(QuotaEntityClientQuotaEntity sourceentity, LongCollection<Op> valueops);
    }

    /**
     * Information@return aboutthe theentity valuewhose forconfig awill quotabe type.modified
     */
    public class Value {
  ClientQuotaEntity entity();

      /**
         * @param@return entrythe thealteration quotato entryperform
     */
    *public Collection<Op> ops();
}

DescribeClientQuotas (2.6.0)

Code Block
languagejava
public class DescribeClientQuotasOptions extends AbstractOptions<DescribeClientQuotasOptions> {
    // Empty.
}

/**
 * The result of the {@link Admin#describeClientQuotas(Collection, DescribeClientQuotasOptions)} call.
 */
public class DescribeClientQuotasResult {

    /**
@param overriddenEntries all values that are overridden due to being lower in
         *            * Maps an entity to its configured quota value(s). Note if no value is specificity,defined orfor null if not requesteda quota
     * type for  */
        public Value(Entry entry, List<Entry> overriddenEntries);
that entity's config, then it is not included in the resulting value map.
     }*

     /**
 @param entities future for * Maps athe collection of entities tothat theirmatched effective quota values.the filter
     */
    public * @param config the quota configuration for the requested entitiesDescribeClientQuotasResult(KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities);

    /**
     */
 Returns a  public DescribeEffectiveQuotasResult(Map<QuotaEntity, KafkaFuture<Map<QuotaKey, Value>>> config);

    /**
     * Returns a map map from quota entity to a future which can be used to check the status of the operation.
     */
    public Map<QuotaEntityKafkaFuture<Map<ClientQuotaEntity, KafkaFuture<Map<QuotaKeyMap<String, Value>>> config();

    /**
     * Returns a future which succeeds only if all quota descriptions succeed.
     */
    public KafkaFuture<Void> allDouble>>> entities();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Describes theall effectiveentities quotasmatching for the provided entities. filter that have at least one client quota configuration
     * value defined.
     * @param<p>
 entities the entities to describe* theThe effectivefollowing quotasexceptions for
can be anticipated when calling * @param options{@code get()} on the optionsfuture tofrom usethe
     * @return the effective quotas for the entitiesreturned {@link DescribeClientQuotasResult}:
     * <ul>
     */   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
    DescribeEffectiveQuotasResult describeEffectiveQuotas(Collection<QuotaEntity>* entities, DescribeEffectiveQuotasOptions options);
}

AlterQuotas

Code Block
languagejava
titleAlterQuotas
public class AlterQuotasEntry {
    public class Op {
   If the authenticated user didn't have describe access to the cluster.</li>
      /**
   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
     * @param  keyIf the request quotadetails typeare and units to alter
    invalid. e.g., an invalid entity type was specified.</li>
     * @param value if set then the existing value is updated,<li>{@link org.apache.kafka.common.errors.TimeoutException}
     *   If *the request timed out before the describe could finish.</li>
      otherwise if null, the existing value is cleared
 * </ul>
     * <p>
        */
 This operation is supported by brokers with public Op(QuotaKey key, Long value);version 2.6.0 or higher.
    }

    /**
     * @param entityfilter the entityfilter whoseto configapply willto bematch modifiedentities
     * @param opsoptions the alterationoptions to perform - if value is set, then the existing value is updated,use
     * @return the          otherwise if null,DescribeClientQuotasResult containing the existing value is clearedresult
     */
    publicDescribeClientQuotasResult AlterQuotasEntrydescribeClientQuotas(QuotaEntityClientQuotaFilter entityfilter, Collection<Op>DescribeClientQuotasOptions opsoptions);
}

ResolveClientQuotas (pending future release)

Code Block
languagejava

public class AlterQuotasOptionsResolveClientQuotasOptions extends AbstractOptions<AlterQuotasOptions>AbstractOptions<ResolveClientQuotasOptions> {
    // Empty.
}

/**
 * The   * Sets whetherresult of the request should be validated without altering the configs.
    {@link Admin#resolveClientQuotas(Collection, ResolveClientQuotasOptions)} call.
 */
public class ResolveClientQuotasResult {
 public AlterQuotasOptions validateOnly(boolean validateOnly);
}

/**
     * TheInformation resultabout ofa thespecific {@link Admin#alterQuotas(Collection<AlterQuotasEntry>, AlterQuotasOptions)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
public class AlterQuotasResult {
    public AlterQuotasResult(Map<QuotaEntity, KafkaFuture<Void>> futures);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.quota configuration entry.
     */
    public class Entry {
        /**
         * @param source the entity source for the value
         * @param value the non-null value
         */
     */
   public publicEntry(QuotaEntity Map<QuotaEntitysource, KafkaFuture<Void>>Double values(value);
    }

    /**
     * ReturnsInformation aabout futurethe whichvalue succeedsfor onlya if all quota alterations succeedtype.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

 * NOTE: We maintain a `Value` class because additional information may be added, e.g.,
     /**
     * Alters thea quotaslist as specified for theof overridden entries.
     */
    public class Value {
 *  @param alterations the alterations to perform
/**
         * @param @returnentry the quota entry
 result of the alterations
     */
    AlterQuotasResult  alterQuotas(Collection<AlterQuotasEntry> entries, AlterQuotasOptions options);
}

kafka-quotas.sh/QuotasCommand:

A QuotasCommand would be constructed with an associated bin/kafka-quotas.sh script for managing quotas via command line, and would have three modes of operation, roughly correlating to each of the API calls:

  1. List: Lists the quota entities for the given entity specification and their corresponding quota values, as explicitly specified in the configuration. The user may provide explicit entity types+names, or a pattern to apply to an entity type find matching entity names. If an entity type is omitted from the input, it is treated as a wildcard.
  2. Describe: Describes the effective quotas for an entity, including contextual information about how those quotas were derived. This includes what configuration entries matched to the entity and, if requested, the overridden, less-specific matches for the entity.
  3. Alter: Modifies a quota configuration entry in an incremental manner, i.e. specify which entries to add, update, and/or remove.
Flags

Various flags will be used to accomplish these operations.

Common flags:
--bootstrap-server: The standard bootstrap server.
--command-config: Property file for the Admin client.

Operations (mutually exclusive):
--list: Lists the entities that match the given specification, and prints out their configuration values.
--describe: Describes the effective quota values for an entity.
--alter: Alters the configuration for the given specification.

Entity specification flags (common to all):
--names: Comma-separated list of type=name pairs, e.g. "user=some-user,client-id=some-client-id"
--defaults: Comma-separated list of entity types with the default name, e.g. "defaults=user,client-id" (Note a separate flag is necessary since names are opaque.)

Exclusive to --list:
--prefix: Comma-separated prefix=name pairs, e.g. "user=test-".

Exclusive to --describe:
--include-overrides: Whether to include overridden config entries.

Exclusive to --alter:
--add: Comma-separated list of entries to add or update to the configuration, in format "name:unit=value".
--delete: Comma-separated list of entries to remove from the configuration, in format "name:unit".
--validate-only: If set, validates the alteration but doesn't perform it.

Output

In general, the output of the entities will be of the form: {entity-type=entity-name, ...}, where entity-name is sanitized for output since it is an opaque string. When displaying configuration values, the form: quota-name:quota-unit=quota-value.

List:

Code Block
$./bin/kafka-quotas.sh --bootstrap-server localhost:9092 --list --names=client-id=my-client

{user=user-two, client-id=my-client}
consumer_byte_rate:shares=200
producer_byte_rate:bps=10000000

{user=user-one, client-id=my-client}
producer_byte_rate:broker_bps=2000000

{user=<default>, client-id=my-client}
consumer_byte_rate:shares=100
producer_byte_rate:broker_bps=500000

$./bin/kafka-quotas.sh --bootstrap-server localhost:9092 --list --prefix=user=user-

{user=user-two, client-id=my-client}
consumer_byte_rate:shares=200
producer_byte_rate:bps=10000000

{user=user-one, client-id=my-client}
producer_byte_rate:broker_bps=2000000

Describe:

Code Block
$./bin/kafka-quotas.sh --bootstrap-server localhost:9092 --describe --names=user=user-one,client-id=my-client

consumer_byte_rate:shares=200 {user=user-one, client-id=my-client}
producer_byte_rate:bps=10000000 {user=user-one, client-id=my-client}
producer_byte_rate:broker_bps=500000 {user=<default>, client-id=my-client}

$./bin/kafka-quotas.sh --bootstrap-server localhost:9092 --describe --names=user=user-two,client-id=my-client --include-overrides

consumer_byte_rate:shares=100 {user=<default>, client-id=my-client}
producer_byte_rate:broker_bps=2000000 {user=user-two, client-id=my-client}
*producer_byte_rate:broker_bps=500000 {user=<default>, client-id=my-client}

Alter:

Code Block
$./bin/kafka-quotas.sh --bootstrap-server localhost:9092 --describe --names=client-id=my-client --defaults=user --add=producer_byte_rate:shares=100 --delete=producer_byte_rate:broker_bps

<no output on success>

$./bin/kafka-quotas.sh --bootstrap-server localhost:9092 --list --names=client-id=my-client --defaults=user

{user=<default>, client-id=my-client}
consumer_byte_rate:shares=100
producer_byte_rate:shares=100

...

public Value(Entry entry);
    }

    /**
     * Maps a collection of entities to their resolved quota values.
     *
     * @param config the quota configuration for the requested entities
     */
    public ResolveClientQuotasResult(Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config();

    /**
     * Returns a future which succeeds only if all quota descriptions succeed.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Resolves the effective quota values for the provided entities.
     *
     * @param entities the entities to describe the resolved quotas for
     * @param options the options to use
     * @return the resolved quotas for the entities
     */
    ResolveClientQuotasResult resolveClientQuotas(Collection<QuotaEntity> entities, ResolveClientQuotasOptions options);
}

AlterClientQuotas (2.6.0)

Code Block
languagejava
titleAlterQuotas
/**
 * Options for {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
public class AlterClientQuotasOptions extends AbstractOptions<AlterClientQuotasOptions> {

    /**
     * Returns whether the request should be validated without altering the configs.
     */
    public boolean validateOnly();

    /**
     * Sets whether the request should be validated without altering the configs.
     */
    public AlterClientQuotasOptions validateOnly(boolean validateOnly);
}

/**
 * The result of the {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterClientQuotasResult {

    /**
     * Maps an entity to its alteration result.
     *
     * @param futures maps entity to its alteration result
     */
    public AlterClientQuotasResult(Map<ClientQuotaEntity, KafkaFuture<Void>> futures);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public Map<ClientQuotaEntity, KafkaFuture<Void>> values();

    /**
     * Returns a future which succeeds only if all quota alterations succeed.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Alters client quota configurations with the specified alterations.
     * <p>
     * Alterations for a single entity are atomic, but across entities is not guaranteed. The resulting
     * per-entity error code should be evaluated to resolve the success or failure of all updates.
     * <p>
     * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
     * the returned {@link AlterClientQuotasResult}:
     * <ul>
     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
     *   If the authenticated user didn't have alter access to the cluster.</li>
     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
     *   If the request details are invalid. e.g., a configuration key was specified more than once for an entity.</li>
     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *   If the request timed out before the alterations could finish. It cannot be guaranteed whether the update
     *   succeed or not.</li>
     * </ul>
     * <p>
     * This operation is supported by brokers with version 2.6.0 or higher.
     *
     * @param entries the alterations to perform
     * @return the AlterClientQuotasResult containing the result
     */
    AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
}

kafka-configs.sh/ConfigCommand (2.6.0)

As a result of introducing the APIs, the ConfigCommand will be updated to support the users and clients entity types when using the --bootstrap-server option.  The modification to ConfigCommand was adopted in KIP-543, and usage will remain unchanged from the original --zookeeper functionality.

kafka-client-quotas.sh/ClientQuotasCommand (pending future release)

A ClientQuotasCommand would be constructed with an associated bin/kafka-client-quotas.sh script for managing quotas via command line, and would have three modes of operation, roughly correlating to each of the API calls:

  1. Describe: Describes the quota entities for the given entity specification and their corresponding quota values, as explicitly specified in the configuration. The user may provide explicit entity types+names, or a pattern to apply to an entity type find matching entity names. If an entity type is omitted from the input, it is treated as a wildcard.
  2. Resolve: Resolves the effective quotas for an entity, including contextual information about how those quotas were derived. This includes what configuration entries matched to the entity.
  3. Alter: Modifies a quota configuration entry in an incremental manner, i.e. specify which entries to add, update, and/or remove.
Flags

Various flags will be used to accomplish these operations.

Common flags:
--bootstrap-server: The standard bootstrap server.
--command-config: Property file for the Admin client.

Operations (mutually exclusive):
--describe: Describes the entities that match the given specification, and prints out their configuration values.
--resolve: Resolves the effective quota values for an entity.
--alter: Alters the configuration for the given specification.

Entity specification flags (common to all):
--names: Comma-separated list of type=name pairs, e.g. "user=some-user,client-id=some-client-id"
--defaults: Comma-separated list of entity types with the default name, e.g. "defaults=user,client-id" (Note a separate flag is necessary since names are opaque.)

Exclusive to --describe: None.

Exclusive to --resolve: None.

Exclusive to --alter:
--add: Comma-separated list of entries to add or update to the configuration, in format "name=value".
--delete: Comma-separated list of entries to remove from the configuration, in format "name".
--validate-only: If set, validates the alteration but doesn't perform it.

Input

When specifying configuration entries, the form: quota-name[=quota-value] is used.

Output

In general, the output of the entities will be of the form: {entity-type=entity-name, ...}, where entity-name is sanitized for output since it is an opaque string. When displaying configuration values, the form: quota-name=quota-value.

Describe:

Code Block
$./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 --describe \
                              --names=client-id=my-client

{user=user-one, client-id=my-client}
consumer_byte_rate=4000000
producer_byte_rate=1000000

{user=user-two, client-id=my-client}
producer_byte_rate=2000000

{user=<default>, client-id=my-client}
consumer_byte_rate=1000000
producer_byte_rate=500000

Resolve:

Code Block
$./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 --resolve \
                              --names=user=user-two,client-id=my-client

consumer_byte_rate=2000000 {user=user-two, client-id=my-client}
producer_byte_rate=500000 {user=<default>, client-id=my-client}

Alter:

Code Block
$./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 --alter   \
                              --names=client-id=my-client --defaults=user \
                              --add=consumer_byte_rate=2000000            \
                              --delete=producer_byte_rate

<no output on success>

$./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 --describe \
                              --names=client-id=my-client --defaults=user

{user=<default>, client-id=my-client}
consumer_byte_rate=2000000

Proposed Changes

In addition to the API changes above, the following write protocol would be implemented:

DescribeClientQuotas (2.6.0)

Code Block
{
  "apiKey": 48,
  "type": "request",
  "name": "DescribeClientQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Components", "type": "[]ComponentData", "versions": "0+",
      "about": "Filter components to apply to quota entities.", "fields": [
      { "name": "EntityType", "type": "string", "versions": "0+",
        "about": "The entity type that the filter component applies to." },
      { "name": "MatchType", "type": "int8", "versions": "0+",
        "about": "How to match the entity {0 = exact name, 1 = default name, 2 = any specified name}." },
      { "name": "Match", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The string to match against, or null if unused for the match type." }
    ]},
    { "name": "Strict", "type": "bool", "versions": "0+",
      "about": "Whether the match is strict, i.e. should exclude entities with unspecified entity types." }
  ]
}

{
  "apiKey": 48,
  "type": "response",
  "name": "DescribeClientQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or `0` if the quota description succeeded." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The error message, or `null` if the quota description succeeded." },
    { "name": "Entries", "type": "[]EntryData", "versions": "0+", "nullableVersions": "0+",
      "about": "A result entry.", "fields": [
      { "name": "Entity", "type": "[]EntityData", "versions": "0+",
        "about": "The quota entity description.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
          "about": "The entity name, or null if the default." }
      ]},
      { "name": "Values", "type": "[]ValueData", "versions": "0+",
	"about": "The quota values for the entity.", "fields": [
        { "name": "Key", "type": "string", "versions": "0+",
          "about": "The quota configuration key." },
        { "name": "Value", "type": "float64", "versions": "0+",
          "about": "The quota configuration value." }
      ]}
    ]}
  ]
}

ResolveClientQuotas (pending future release)

In addition to the API changes above, the following write protocol would be implemented:

...

Code Block
{
  "apiKey": 4850,
  "type": "request",
  "name": "DescribeQuotasRequestResolveClientQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "FilterEntity", "type": "[]QuotaFilterDataQuotaEntityData", "versions": "0+",
      "about": "FiltersThe toquota apply to quota entitiesentity description.", "fields": [
      { "name": "EntityType", "type": "string", "versions": "0+",
        "about": "The entity type that the filter applies to." },
      { "name": "Rule", "type": "string", "versions": "0+",
        "about": "The rule the filterentity performstype." },
      { "name": "MatchEntityName", "type": "string", "versions": "0+",
        "about": "The string to apply the rule againstentity name." }
    ]}
  ]
}

{
  "apiKey": 4850,
  "type": "response",
  "name": "DescribeQuotasResponseResolveClientQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Entry", "type": "[]EntryDataQuotaEntryData", "versions": "0+",
      "about": "AResolved resultquota entryentries.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the resolved quota description succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the resolved quota description succeeded." },
      { "name": "EntityQuotaEntity", "type": "[]QuotaEntityDataQuotaEntity", "versions": "0+",
        "about": "TheResolved quota entity description.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },entries.", "fields": [
        { "name": "EntityNameEntityType", "type": "string", "versions": "0+",
          "about": "The entity nametype." },
      ]},
      { "name": "TypeEntityName", "type": "string", "versions": "0+",
          "about": "The quotaentity typename." }
      ]},
      { "name": "UnitsQuotaValues", "type": "string[]QuotaValueData", "versions": "0+",
        "about": "TheQuota units for the value." },
configuration values.", "fields": [
        { "name": "ValueType", "type": "int64string", "versions": "0+",
        "about": "The quota value." }
    ]}
  ]
}

DescribeEffectiveQuotas:

Code Block
{
  "apiKeyabout": 49 "The quota type." },
  "type      { "name": "requestEntry",
  "nametype": "DescribeEffectiveQuotasRequest[]ValueEntryData",
  "validVersionsversions": "0+",
    "flexibleVersions      "about": "noneQuota value entries.",
  "fields": [
          { "name": "EntityQuotaEntity", "type": "[]QuotaEntityDataValueQuotaEntity", "versions": "0+",
            "about": "TheResolved quota entity descriptionentries.", "fields": [
            { "name": "EntityType", "type": "string", "versions": "0+",
              "about": "The entity type." },
            { "name": "EntityName", "type": "string", "versions": "0+",

              "about": "The entity name." }
          ]},
          { "name": "OmitOverriddenValuesValue", "type": "booldouble", "versions": "0+",
            "about": "The quota configuration "Whether to exclude the list of overridden values for every quota type." value." }
        ]}
      ]}
    ]}
  ]
}

AlterClientQuotas (2.6.0)

Code Block
{
  "apiKey": 49,
  "type": "request",
  "name": "AlterClientQuotasRequest",


{
  "apiKeyvalidVersions": 49"0",
  "typeflexibleVersions": "responsenone",
  "fields": [
    { "name": "DescribeEffectiveQuotasResponseEntries",
 "type": "validVersions[]EntryData", "versions": "0+",
      "flexibleVersionsabout": "noneThe quota configuration entries to alter.",
  "fields": [
      { "name": "ThrottleTimeMsEntity", "type": "int32[]EntityData", "versions": "0+",
        "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota quota entity to alter.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntryEntityName", "type": "[]QuotaEntryDatastring", "versions": "0+", "nullableVersions": "0+",
          "about": "Effective quota entries.", "fields": [The name of the entity, or null if the default." }
      ]},
      { "name": "ErrorCodeOps", "type": "int16[]OpData", "versions": "0+",
        "about": "TheAn errorindividual code,quota orconfiguration `0`entry if the effective quota description succeeded." },
 to alter.", "fields": [
        { "name": "ErrorMessageKey", "type": "string", "versions": "0+",
 "nullableVersions": "0+",
        "about": "The error message, or `null` if the effective quota descriptionconfiguration succeededkey." },
        { "name": "QuotaEntityValue", "type": "[]QuotaEntityfloat64", "versions": "0+",
          "about": "Effective quota entries.", "fields": [ "The value to set, otherwise ignored if the value is to be removed." },
        { "name": "EntityTypeRemove", "type": "stringbool", "versions": "0+",
          "about": "The entity typeWhether the quota configuration value should be removed, otherwise set." },
      ]}
    ]},
    { "name": "EntityNameValidateOnly", "type": "stringbool", "versions": "0+",
          "about": "TheWhether entitythe name." }
      ]},
      { "namealteration should be validated, but not performed." }
  ]
}

{
  "apiKey": 49,
  "type": "QuotaValuesresponse",
  "typename": "[]QuotaValueDataAlterClientQuotasResponse",
  "versionsvalidVersions": "0+",
        "aboutflexibleVersions": "Quota configuration values.none",
  "fields": [
        { "name": "TypeThrottleTimeMs", "type": "stringint32", "versions": "0+",
          "about": "The quotaduration type." },
        { "name": "Units", "type": "string", "versions": "0+",
          "about": "The units for the quota typein milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
        { "name": "EntryEntries", "type": "[]ValueEntryDataEntryData", "versions": "0+",
          "about": "QuotaThe quota valueconfiguration entries to alter.", "fields": [
          { "name": "QuotaEntityErrorCode", "type": "[]ValueQuotaEntityint16", "versions": "0+",
            "about": "EffectiveThe quotaerror entries."code, "fields": [
      or `0` if the quota alteration succeeded." },
      { "name": "EntityTypeErrorMessage", "type": "string", "versions": "0+",
      "nullableVersions": "0+",
        "about": "The error entity type." },
      message, or `null` if the quota alteration succeeded." },
      { "name": "EntityNameEntity", "type": "string[]EntityData", "versions": "0+",
              "about": "The quota entity nameto alter." }
          ]},
 , "fields": [
         { "name": "ValueEntityType", "type": "int64string", "versions": "0+",
            "about": "The quotaentity configuration valuetype." }
        ]},
      ]}
    ]}
  ]
}

AlterQuotas:

Code Block
{
  "apiKeyname": 50,
  "typeEntityName": "request",
  "nametype": "AlterQuotasRequeststring",
  "validVersionsversions": "0+",
  "flexibleVersionsnullableVersions": "none0+",
    "fields": [
    { "nameabout": "Entry", "type": "[]EntryData", "versions": "0+",
      "about": "The quota configuration entries to alter.", "fields": [
      { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",The name of the entity, or null if the default." }
      ]}
    ]}
  ]
}

Kafka RPC 'double' support (2.6.0)

Note that, while the ByteBuffer natively supports serializing a Double, the format in which the value is serialized is not strongly specified, so the preference is to explicitly ensure a standard representation of double-precision 64-bit IEEE 754 format. This is achieved in Java using Double.doubleToRawLongBits() and Double.longBitsToDouble() and should be easily portable to other languages.

Code Block
titleclients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
    /**
     * Read  "about": "The quota entity to alter.", "fields": [a double-precision 64-bit format IEEE 754 value.
     *
   { "name": "EntityType", "type": "string", "versions": "0+",
     * @param buffer The buffer to read from
     * "about":@return "The entitylong type." },value read
     */
   { "name": "EntityName", "type": "string", "versions": "0+",public static double readDouble(ByteBuffer buffer) {
        return  "about": "The name of the entity." }Double.longBitsToDouble(buffer.getLong());
    }

    /**
      ]},
      { "name": "Op", "type": "[]OpData", "versions": "0+",
        "about": "An individual quota configuration entry to alter.", "fields": [* Write the given double following the double-precision 64-bit format IEEE 754 value into the buffer.
     *
     * @param value The value to write
     * @param buffer { "name": "Type", "type": "string", "versions": "0+",
The buffer to write to
     */
    public static  "about": "The quota type." },void writeDouble(double value, ByteBuffer buffer) {
        buffer.putLong(Double.doubleToRawLongBits(value));
    }

The protocol type definition:

Code Block
titleclients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
    public static final DocumentedType DOUBLE = new DocumentedType() {
        @Override{ "name": "Units", "type": "string", "versions": "0+",
          "about": "The units for the quota type." },
        public { "name": "Value", "type": "int64", "versions": "0+",void write(ByteBuffer buffer, Object o) {
          "about": "The value to set, otherwise ignored if the value is to be removed." }, ByteUtils.writeDouble((Double) o, buffer);
        }

        { "name": "Remove", "type": "bool", "versions": "0+",
@Override
        public Object read(ByteBuffer buffer) {
   "about": "Whether the quota configuration value should be removed, otherwisereturn set." }
ByteUtils.readDouble(buffer);
        ]}

        ]},@Override
    { "name": "ValidateOnly", "type": "bool", "versions": "0+",
   public int sizeOf(Object o) {
    "about": "Whether the alteration should be validated, but not performed." }return 8;
  ]
}

{
  "apiKey": 50,
  "type": "response", }

  "name": "AlterQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      @Override
        public String typeName() {
            return "aboutDOUBLE":;
 "The duration in milliseconds for which the request}

 was throttled due to a quota violation, or zero@Override
 if the request did not violate any quota." },
    { "name": "Entry", "type": "[]EntryData", "versions": "0+",
public Double validate(Object item) {
             "about": "The quota configuration entries to alter.", "fields": [
if (item instanceof Double)
            { "name": "ErrorCode", "type": "int16", "versions": "0+", return (Double) item;
        "about": "The error code, orelse
 `0` if the quota alteration succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", throw new SchemaException(item + " is not a Double.");
        }

        "about": "The error message, or `null` if the quota alteration succeeded." },
@Override
        public String documentation() {
       { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",
        "about": "The quota entity to alter.", "fields": [
 return "Represents a double-precision 64-bit format IEEE 754 value. " +
               { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The name of the entity." }
      ]}
    ]}
  ]
}The values are encoded using eight bytes in network byte order (big-endian).";
        }
    };


In generator/src/main/java/org/apache/kafka/message/MessageGenerator.java, the following operations will be used (code omitted for brevity):

Code Block
titlegenerator/src/main/java/org/apache/kafka/message/MessageGenerator.java
Hash code: Double.hashCode(value)

Empty value: (double) 0

Parsing a default value string: Double.parseDouble(defaultValue)

Compatibility, Deprecation, and Migration Plan

All changes would be are forward-compatible, and no migration plan is necessary. It's outside the scope of this KIP to deprecate any functionality.

Rejected Alternatives

  • Use existing describeConfigs/incrementalAlterConfigs for quota functionality. This falls short for a couple reasons. First, quotas entity names are more dynamic than brokers and tasks which makes them awkward to fit into generic tools which expect a single unique, distinct key, e.g. ConfigCommand. Second, there's no tool that expresses a way to get the effective quota resolved quota for an entity without some heavy engineering on the client side, which lacks extensibility and is more expensive to perform, especially over large collection of entities. Therefore, it makes sense to approach quotas as a standalone set of APIs that provide more targeted information and can properly support future extensibility.

...