Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under discussionAccepted - 2.6.0 contains describe and alter functionality, resolve is pending for a future release.

Discussion thread: here

JIRA: KAFKA-7740

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Quota management via Admin Client has gone through a couple drafts of proposals (KIP-248, KIP-422). While improvements have been made to the Admin interface for configuration handling, fitting quotas into the API is awkward as they don't fit the natural key-value pairing, nor is the configuration output expressive enough to return all useful information. Therefore, it'd be beneficial to have a quota-native API for managing quotas, which would offer an intuitive and less error-prone interface, convey additional information beyond what the configuration APIs provide, and enable for future extensibility as quotas types are added or evolved.

Background

By default, quotas are defined in terms of a user and client ID, where the user acts as an opaque principal name, and the client ID as a generic group identifier. When setting quotas, an administrator has flexibility in how it specifies the user and client ID for which the quota applies, where the user and client ID may be specifically named, indicated as a default, or omitted entirely. Since quotas have flexible configurations, there is a method for resolving the quotas that apply to a request: a hierarchy structure is used, where the most specific defined quota will be matched to a request's user and client ID.

...

As such, reasoning around quotas can be complex, as it's not immediately obvious which quotas may apply to a given user and/or client ID. Providing descriptive information about how quotas are matched is the first goal of this KIP. Likewise, retrieving and modifying quota values can be done in a more expressive and robust way, which is the second goal of the KIP.

APIs

In order to clearly specify the APIs, let's first disambiguate some terminology: Every client request that is processed is associated with a quota entity, which is a map of entity types to their corresponding entity names for the request. Using the current entity types, an entity is of the form {user=test-user, client-id=test-client}, where user and client-id are the types, and test-user and test-client are the names. However, when specifying a quota configuration entry, only a subset of the entity types need to be provided, which is referred to as an entity match, for example, {user=<default>}.

...

The quota values are of type Double, which presents a complication in that the RPC protocol doesn't support floating point values. To accommodate this, RPC protocol message type 'double' will be added, which will serialize doubles according to the IEEE 754 floating-point "double format" bit layout.

Types Rationale

While there's two defined entity types in AK, a server-side plugin mechanism allows for further expansion. Likewise, as use cases evolve, finer-grained quota control may be necessary. Therefore, entity types should not be statically bound to publicly defined constants, and instead the API should support flexible entity types by interpreting them as a String identifier. Any entity types that the broker doesn't understand should throw an IllegalArgumentException back to the client.

...

Since a fixed set of entity types aren't defined, an entity should be represented by a Map<String, String>, which maps an entity type to the entity name.

Public Interfaces

Admin client calls will be added to correspond to DescribeClientQuotas, ResolveClientQuotas, and AlterClientQuotas, with supporting types defined in the common.quotas package.

Common types in package org.apache.kafka.common.quota (2.6.0)

Code Block
languagejava
/**
 * Describes a fully-qualified entity client quota entity, which is a mapping of entity types to their names.
 */
public class QuotaEntityClientQuotaEntity {

    /**
     * The Typetype of an entity entry.
     */
    public static final String USER = "user";
    public static final String CLIENT_ID = "client-id";

    /**
     * RepresentsConstructs thea defaultquota nameentity for an entity, i.e. the entity that's matchedthe given types and names. If a name is null,
     * whenthen anit exactis match isn't foundmapped to the built-in default entity name.
     */
     public* static@param finalentries String QUOTA_ENTITY_NAME_DEFAULT = // implementation defined

maps entity type to its name
     */**
    public * `entries` describes the fully-qualified entity. The key is a {@code Type} string, however
     * there may also exist keys that are not enumerated by {@code Type} that still apply, e.g.
     * the server may internally associate another type. When querying entities, it's necessary
     * to return all quota types because quota values for these types may influence the resolvedClientQuotaEntity(Map<String, String> entries);

    /**
     * @return map of entity type to its name
     */
    public Map<String, String> entries();
}

/**
 * Describes a component for applying a client quota filter.
 */
public class ClientQuotaFilterComponent {

    /**
     * Constructs and returns a filter component that exactly matches the provided entity
     * name for the entity type.
     *
 quota value. However, when altering* a@param quota,entityType anythe typesentity thattype aren'tthe specifiedfilter mustcomponent beapplies ableto
     * to be inferred by@param entityName the server,entity otherwisename anthat's error is returned.matched exactly
     */
    public static *ClientQuotaFilterComponent ForofEntity(String exampleentityType, {("CLIENT_ID" -> "test-client"),String entityName);

     /**
     * Constructs and returns a filter component that matches the ("USER" -> "test-user"),built-in default entity name
     * for the entity type.
     *
     * @param entityType the entity type the filter ("GROUP" -> "internal-group")}.component applies to
     */
    public static ClientQuotaFilterComponent QuotaEntityofDefaultEntity(Map<String, String> entriesString entityType);
}

    /**
     * DescribesConstructs aand quotareturns entitya filter.
 */
public class QuotaFilter {
    /** component that matches any specified name for the
     * A filter to be appliedentity type.
     *
     * @param entityType the entity type the filter component applies to
     * @param match the string that's matched exactly, or null if the type should be omitted/
    public static ClientQuotaFilterComponent ofEntityType(String entityType);

    /**
     * @return the component's entity type
     */
    public QuotaFilter(String entityType, String match();
}

DescribeClientQuotas

Code Block
languagejava
public class DescribeClientQuotasOptions extends AbstractOptions<DescribeClientQuotasOptions> { /**
    // Empty.
}

/**
 *@return Thethe resultoptional ofmatch the {@link Admin#DescribeClientQuotas(Collection<QuotaFilter>, DescribeClientQuotasOptions)} call.
 */
public class DescribeClientQuotasResult {

    /**string, where:
     *         if present, the name that's matched exactly
     * Maps an entity to its configured quota value(s). Note if noempty, valuematches isthe defined for a quotadefault name
     * type for that entity's config, then it is not includedif innull, thematches resultingany valuespecified map.name
     */
    public * @param entities the collection of entities that matched the filter
     */
    public DescribeClientQuotasResult(KafkaFuture<Map<QuotaEntity, Map<String, Double>>> entities);Optional<String> match();
}

/**
 * Describes a client quota entity filter.
 */
public class ClientQuotaFilter {

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation. A filter to be applied to matching client quotas.
     *
     * @param components the components to filter on
     */
 @param strict whether publicthe KafkaFuture<Map<QuotaEntity,filter Map<String, Double>>> entities();
}

public interface Admin extends AutoCloseable {
    ...only includes specified components
     */
    private ClientQuotaFilter(Collection<ClientQuotaFilterComponent> components, boolean strict);

    /**
     * DescribesConstructs alland entitiesreturns matchinga allquota providedfilter filtersthat (logicalmatches AND)all thatprovided havecomponents. atMatching least oneentities
     * quotawith entity valuetypes defined.
that are not specified by *
a component will also be *included @paramin filtersthe filtersresult.
 to apply to matching entities*
     * @param optionscomponents the optionscomponents for tothe usefilter
     */
  @return  resultpublic containingstatic allClientQuotaFilter matching entitiescontains(Collection<ClientQuotaFilterComponent> components);

     /**/
    DescribeClientQuotasResult describeClientQuotas(Collection<QuotaFilter> filters, DescribeClientQuotasOptions options);
}

ResolveClientQuotas:

Code Block
languagejava
public class ResolveClientQuotasOptions extends AbstractOptions<ResolveClientQuotasOptions> {
    // Empty.
}

/**
 * The result of the {@link Admin#ResolveClientQuotas(Collection<QuotaEntity>, ResolveClientQuotasOptions)} call.
 */
public class ResolveClientQuotasResult {
    /** Constructs and returns a quota filter that matches all provided components. Matching entities
     * with entity types that are not specified by a component will *not* be included in the result.
     *
     * Information@param aboutcomponents athe specificcomponents quotafor configurationthe entry.filter
     */
    public static classClientQuotaFilter Entry {
  containsOnly(Collection<ClientQuotaFilterComponent> components);

      /**
     * Constructs and returns *a @paramquota sourcefilter thethat entitymatches sourceall for the valueconfigured entities.
     */
    *public @paramstatic value the non-null valueClientQuotaFilter all();

    /**
     * @return the filter's */components
     */
    public Entry(QuotaEntity source, Double value);
    }Collection<ClientQuotaFilterComponent> components();

    /**
     * Information@return aboutwhether the value for a quota type.filter is strict, i.e. only includes specified components
     */
    public boolean strict();
}

/**
 NOTE:* WeDescribes maintaina aconfiguration `Value`alteration classto becausebe additionalmade informationto maya beclient added, e.g.,
     *       a list of overridden entries.
     */quota entity.
 */
public class ClientQuotaAlteration {

    public static class ValueOp {

        /**
         * @param entrykey the quota entrytype to alter
         */
 @param value if set then the existing publicvalue Value(Entry entry);is updated,
    }

     /**
      * Maps a collection of entities to their resolvedotherwise quota values.
     *if null, the existing value is cleared
     *  @param config the*/
 quota configuration for the requested entities
     */
    public ResolveClientQuotasResultOp(Map<QuotaEntity,String KafkaFuture<Map<Stringkey, Value>>>Double configvalue);

    /**
     /**
 Returns a map from quota entity to a future* which@return canthe bequota usedtype to check thealter
 status of the operation.
     */
      public Map<QuotaEntity, KafkaFuture<Map<String,public Value>>>String configkey();

        /**
       * Returns a* future@return whichif succeedsset onlythen ifthe allexisting quotavalue descriptionsis succeed.updated,
     */
    public* KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

        otherwise if null, the existing value is cleared
    /**
     */
 Resolves  the effective quota values for thepublic provided entities.Double value();
    }

 *
   private final ClientQuotaEntity *entity;
  @param entities theprivate entitiesfinal to describe the resolved quotas forCollection<Op> ops;

    /**
     * @param optionsentity the options to useentity whose config will be modified
     * @param @returnops the resolvedalteration quotas for the entitiesto perform
     */
    ResolveClientQuotasResultpublic resolveClientQuotasClientQuotaAlteration(Collection<QuotaEntity>ClientQuotaEntity entitiesentity, ResolveClientQuotasOptionsCollection<Op> optionsops);
}

AlterClientQuotas

Code Block
languagejava
titleAlterQuotas
public class AlterClientQuotasEntry {
    /**
    public class* Op@return {
the entity whose config will be   /**modified
         */
 @param key the quotapublic type to alterClientQuotaEntity entity();

    /**
     * @param@return valuethe ifalteration setto thenperform
 the existing value is updated,*/
    public Collection<Op> ops();
}

DescribeClientQuotas (2.6.0)

Code Block
languagejava
public class DescribeClientQuotasOptions *extends AbstractOptions<DescribeClientQuotasOptions> {
    // Empty.
}

/**
 * The result of the  otherwise if null, the existing value is cleared{@link Admin#describeClientQuotas(Collection, DescribeClientQuotasOptions)} call.
 */
public class DescribeClientQuotasResult {

    /**
     */
 Maps an entity to its configured quota public Op(String key, Double value);
    }

    /**value(s). Note if no value is defined for a quota
     * @paramtype entityfor thethat entity whose's config, willthen beit modified
is not included in  * @param ops the alteration to perform - ifresulting value is set, then the existing value is updated,map.
     *
     * @param entities future for the collection of entities    otherwise if null,that matched the existing value is clearedfilter
     */
    public AlterClientQuotasEntryDescribeClientQuotasResult(QuotaEntityKafkaFuture<Map<ClientQuotaEntity, entityMap<String, Collection<Op>Double>>> opsentities);
}

public class AlterClientQuotasOptions extends AbstractOptions<AlterClientQuotasOptions> {
/**
     /**
 Returns a map from *quota entity Setsto whethera thefuture requestwhich shouldcan be validated without altering used to check the status of the configsoperation.
     */
    public AlterClientQuotasOptions validateOnly(boolean validateOnly KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities();
}

/**
 * The result of the {@link Admin#AlterClientQuotas(Collection<AlterClientQuotasEntry>, AlterClientQuotasOptions)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
public class AlterClientQuotasResult {public interface Admin extends AutoCloseable {
    ...

    /**
     * Describes all entities matching the provided filter that have at least one client quota configuration
    public AlterClientQuotasResult(Map<QuotaEntity, KafkaFuture<Void>> futures);

* value defined.
     /** <p>
     * Returns a map from quota entity to a future which can be used to check the status of the operation. The following exceptions can be anticipated when calling {@code get()} on the future from the
     * returned {@link DescribeClientQuotasResult}:
     */ <ul>
    public Map<QuotaEntity,* KafkaFuture<Void>> values();

    /** <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
     * Returns a futureIf whichthe succeedsauthenticated onlyuser ifdidn't allhave quotadescribe alterationsaccess succeed.
to     */the cluster.</li>
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

*   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
     /**
   If the *request Altersdetails the quotas as specified for the entries.are invalid. e.g., an invalid entity type was specified.</li>
     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
     * @param alterations   If the request timed out before the alterationsdescribe to performcould finish.</li>
     * @return</ul>
  the result of the* alterations<p>
     */
 This operation is supported AlterClientQuotasResultby alterClientQuotas(Collection<AlterClientQuotasEntry> entries, AlterClientQuotasOptions options);
}brokers with version 2.6.0 or higher.
     *
     * @param filter the filter to apply to match entities
     * @param options the options to use
     * @return the DescribeClientQuotasResult containing the result
     */
    DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options);
}

ResolveClientQuotas (pending future release)

Code Block
languagejava
public class ResolveClientQuotasOptions extends AbstractOptions<ResolveClientQuotasOptions> {
    // Empty.
}

/**
 * The result of the {@link Admin#resolveClientQuotas(Collection, ResolveClientQuotasOptions)} call.
 */
public class ResolveClientQuotasResult {
    /**
     * Information about a specific quota configuration entry.
     */
    public class Entry {
        /**
         * @param source the entity source for the value
         * @param value the non-null value
         */
        public Entry(QuotaEntity source, Double value);
    }

    /**
     * Information about the value for a quota type.
     *
     * NOTE: We maintain a `Value` class because additional information may be added, e.g.,
     *       a list of overridden entries.
     */
    public class Value {
        /**
         * @param entry the quota entry
         */
        public Value(Entry entry);
    }

    /**
     * Maps a collection of entities to their resolved quota values.
     *
     * @param config the quota configuration for the requested entities
     */
    public ResolveClientQuotasResult(Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config();

    /**
     * Returns a future which succeeds only if all quota descriptions succeed.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Resolves the effective quota values for the provided entities.
     *
     * @param entities the entities to describe the resolved quotas for
     * @param options the options to use
     * @return the resolved quotas for the entities
     */
    ResolveClientQuotasResult resolveClientQuotas(Collection<QuotaEntity> entities, ResolveClientQuotasOptions options);
}

AlterClientQuotas (2.6.0)

Code Block
languagejava
titleAlterQuotas
/**
 * Options for {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
public class AlterClientQuotasOptions extends AbstractOptions<AlterClientQuotasOptions> {

    /**
     * Returns whether the request should be validated without altering the configs.
     */
    public boolean validateOnly();

    /**
     * Sets whether the request should be validated without altering the configs.
     */
    public AlterClientQuotasOptions validateOnly(boolean validateOnly);
}

/**
 * The result of the {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterClientQuotasResult {

    /**
     * Maps an entity to its alteration result.
     *
     * @param futures maps entity to its alteration result
     */
    public AlterClientQuotasResult(Map<ClientQuotaEntity, KafkaFuture<Void>> futures);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public Map<ClientQuotaEntity, KafkaFuture<Void>> values();

    /**
     * Returns a future which succeeds only if all quota alterations succeed.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Alters client quota configurations with the specified alterations.
     * <p>
     * Alterations for a single entity are atomic, but across entities is not guaranteed. The resulting
     * per-entity error code should be evaluated to resolve the success or failure of all updates.
     * <p>
     * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
     * the returned {@link AlterClientQuotasResult}:
     * <ul>
     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
     *   If the authenticated user didn't have alter access to the cluster.</li>
     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
     *   If the request details are invalid. e.g., a configuration key was specified more than once for an entity.</li>
     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *   If the request timed out before the alterations could finish. It cannot be guaranteed whether the update
     *   succeed or not.</li>
     * </ul>
     * <p>
     * This operation is supported by brokers with version 2.6.0 or higher.
     *
     * @param entries the alterations to perform
     * @return the AlterClientQuotasResult containing the result
     */
    AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
}

kafka-configs.sh/ConfigCommand (2.6.0)

As a result of introducing the APIs, the ConfigCommand will be updated to support the users and clients entity types when using the --bootstrap-server option.  The modification to ConfigCommand was adopted in KIP-543, and usage will remain unchanged from the original --zookeeper functionality.

kafka-client-quotas.sh/ClientQuotasCommand (pending future release)

A ClientQuotasCommand would be constructed with an associated bin/kafka-client-quotas.sh script for managing quotas via command line, and would have three modes of operation, roughly correlating to each of the API calls:

  1. Describe: Describes the quota entities for the given entity specification and their corresponding quota values, as explicitly specified in the configuration. The user may provide explicit entity types+names, or a pattern to apply to an entity type find matching entity names. If an entity type is omitted from the input, it is treated as a wildcard.
  2. Resolve: Resolves the effective quotas for an entity, including contextual information about how those quotas were derived. This includes what configuration entries matched to the entity.
  3. Alter: Modifies a quota configuration entry in an incremental manner, i.e. specify which entries to add, update, and/or remove.
Flags

Various flags will be used to accomplish these operations.

...

Exclusive to --alter:
--add: Comma-separated list of entries to add or update to the configuration, in format "name=value".
--delete: Comma-separated list of entries to remove from the configuration, in format "name".
--validate-only: If set, validates the alteration but doesn't perform it.

Input

When specifying configuration entries, the form: quota-name[=quota-value] is used.

Output

In general, the output of the entities will be of the form: {entity-type=entity-name, ...}, where entity-name is sanitized for output since it is an opaque string. When displaying configuration values, the form: quota-name=quota-value.

...

Code Block
$./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 --alter   \
                              --names=client-id=my-client --defaults=user \
                              --names=client-id=my-client --defaults=useradd=consumer_byte_rate=2000000            \
                              --adddelete=consumerproducer_byte_rate=2000000

<no output on success>

$./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092       --describe \
                              --names=client-id=my-client --delete=producerdefaults=user

{user=<default>, client-id=my-client}
consumer_byte_rate=2000000

Proposed Changes

In addition to the API changes above, the following write protocol would be implemented:

DescribeClientQuotas (2.6.0)

Code Block
{

<no output on success>

$./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 --describe \
                      "apiKey": 48,
  "type": "request",
  "name": "DescribeClientQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Components", "type": "[]ComponentData", "versions": "0+",
      "about": "Filter components  --names=client-id=my-client --defaults=user

{user=<default>, client-id=my-client}
consumer_byte_rate=2000000

Proposed Changes

In addition to the API changes above, the following write protocol would be implemented:

DescribeClientQuotas

Code Block
{
  "apiKey": 48,
  "type": "request",
  "name": "DescribeClientQuotasRequest",
  "validVersionsto apply to quota entities.", "fields": [
      { "name": "EntityType", "type": "string", "versions": "0+",
  "flexibleVersions      "about": "none"The entity type that the filter component applies to." },
  "fields": [
    { "name": "FilterMatchType", "type": "[]QuotaFilterDataint8", "versions": "0+",
        "about": "Filters to apply to quota entities.", "fields": [ "How to match the entity {0 = exact name, 1 = default name, 2 = any specified name}." },
      { "name": "EntityTypeMatch", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The entity type thatstring to match against, or null if unused for the filtermatch applies totype." },
    ]},
    { "name": "MatchStrict", "type": "stringbool", "versions": "0+", "nullableVersions": "0+",
        "about": "TheWhether stringthe tomatch matchis againststrict, or null if the typei.e. should beexclude omitted." }
    ]entities with unspecified entity types." }
  ]
}

{
  "apiKey": 48,
  "type": "response",
  "name": "DescribeClientQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "EntryErrorCode", "type": "[]EntryDataint16", "versions": "0+",
      "about": "AThe resulterror entry.", "fields": [
  code, or `0` if the quota description succeeded." },
    { "name": "ErrorCodeErrorMessage", "type": "int16string", "versions": "0+",
 "nullableVersions": "0+",
      "about": "The error codemessage, or `0``null` if the quota description succeeded." },
      { "name": "ErrorMessageEntries", "type": "string[]EntryData", "versions": "0+", "nullableVersions": "0+",
        "about": "TheA errorresult message, or `null` if the quota description succeeded." },entry.", "fields": [
      { "name": "Entity", "type": "[]QuotaEntityDataEntityData", "versions": "0+",
        "about": "The quota entity description.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
          "about": "The entity name, or null if the default." }
      ]},
      { "name": "Values", "type": "[]ValueData", "versions": "0+",
	"about": "The quota values for the entity.", "fields": [
        { "name": "TypeKey", "type": "string", "versions": "0+",
          "about": "The quota typeconfiguration key." },
        { "name": "Value", "type": "doublefloat64", "versions": "0+",
          "about": "The quota configuration value." }
      ]}
    ]}
  ]
}

ResolveClientQuotas (pending future release)

Code Block
{
  "apiKey": 4950,
  "type": "request",
  "name": "ResolveClientQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+",
      "about": "The quota entity description.", "fields": [
      { "name": "EntityType", "type": "string", "versions": "0+",
        "about": "The entity type." },
      { "name": "EntityName", "type": "string", "versions": "0+",
        "about": "The entity name." }
    ]},
  ]
}

{
  "apiKey": 4950,
  "type": "response",
  "name": "ResolveClientQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Entry", "type": "[]QuotaEntryData", "versions": "0+",
      "about": "Resolved quota entries.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the resolved quota description succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the resolved quota description succeeded." },
      { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",
        "about": "Resolved quota entries.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The entity name." }
      ]},
      { "name": "QuotaValues", "type": "[]QuotaValueData", "versions": "0+",
        "about": "Quota configuration values.", "fields": [
        { "name": "Type", "type": "string", "versions": "0+",
          "about": "The quota type." },
        { "name": "Entry", "type": "[]ValueEntryData", "versions": "0+",
          "about": "Quota value entries.", "fields": [
          { "name": "QuotaEntity", "type": "[]ValueQuotaEntity", "versions": "0+",
            "about": "Resolved quota entries.", "fields": [
            { "name": "EntityType", "type": "string", "versions": "0+",
              "about": "The entity type." },
            { "name": "EntityName", "type": "string", "versions": "0+",
              "about": "The entity name." }
          ]},
          { "name": "Value", "type": "double", "versions": "0+",
            "about": "The quota configuration value." }
        ]}
      ]}
    ]}
  ]
}

AlterClientQuotas (2.6.0)

Code Block
{
  "apiKey": 5049,
  "type": "request",
  "name": "AlterClientQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "EntryEntries", "type": "[]EntryData", "versions": "0+",
      "about": "The quota configuration entries to alter.", "fields": [
      { "name": "QuotaEntityEntity", "type": "[]QuotaEntityEntityData", "versions": "0+",
        "about": "The quota entity to alter.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
          "about": "The name of the entity, or null if the default." }
      ]},
      { "name": "OpOps", "type": "[]OpData", "versions": "0+",
        "about": "An individual quota configuration entry to alter.", "fields": [
        { "name": "TypeKey", "type": "string", "versions": "0+",
          "about": "The quota typeconfiguration key." },
        { "name": "Value", "type": "doublefloat64", "versions": "0+",
          "about": "The value to set, otherwise ignored if the value is to be removed." },
        { "name": "Remove", "type": "bool", "versions": "0+",
          "about": "Whether the quota configuration value should be removed, otherwise set." }
      ]}
    ]},
    { "name": "ValidateOnly", "type": "bool", "versions": "0+",
      "about": "Whether the alteration should be validated, but not performed." }
  ]
}

{
  "apiKey": 5049,
  "type": "response",
  "name": "AlterClientQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "EntryEntries", "type": "[]EntryData", "versions": "0+",
      "about": "The quota configuration entries to alter.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the quota alteration succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the quota alteration succeeded." },
      { "name": "QuotaEntityEntity", "type": "[]QuotaEntityEntityData", "versions": "0+",
        "about": "The quota entity to alter.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
          "about": "The name of the entity, or null if the default." }
      ]}
    ]}
  ]
}

Kafka RPC 'double' support (2.6.0)

Note that, while the ByteBuffer natively supports serializing a Double, the format in which the value is serialized is not strongly specified, so the preference is to explicitly ensure a standard representation of double-precision 64-bit IEEE 754 format. This is achieved in Java using Double.doubleToRawLongBits() and Double.longBitsToDouble() and should be easily portable to other languages.

...

Code Block
titlegenerator/src/main/java/org/apache/kafka/message/MessageGenerator.java
Hash code: Double.hashCode(value)

Empty value: (double) 0

Parsing a default value string: Double.parseDouble(defaultValue)

Compatibility, Deprecation, and Migration Plan

All changes are forward-compatible, and no migration plan is necessary. It's outside the scope of this KIP to deprecate any functionality.

Rejected Alternatives

  • Use existing describeConfigs/incrementalAlterConfigs for quota functionality. This falls short for a couple reasons. First, quotas entity names are more dynamic than brokers and tasks which makes them awkward to fit into generic tools which expect a single unique, distinct key, e.g. ConfigCommand. Second, there's no tool that expresses a way to get the resolved quota for an entity without some heavy engineering on the client side, which lacks extensibility and is more expensive to perform, especially over large collection of entities. Therefore, it makes sense to approach quotas as a standalone set of APIs that provide more targeted information and can properly support future extensibility.

...