Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/**
 * Describes a fully-qualified entityclient quota entity, which is a mapping of entity types to their names.
 */
public class QuotaEntityClientQuotaEntity {

    /**
     * TypeThe type of an entity entry.
     */
    public static final String USER = "user";
    public static final String CLIENT_ID = "client-id";

    /**
     * RepresentsConstructs thea defaultquota nameentity for the an entity, i.e. the entity that's matchedgiven types and names. If a name is null,
     * when an exact match isn't foundthen it is mapped to the built-in default entity name.
     */
     * public@param staticentries finalmaps String QUOTA_ENTITY_NAME_DEFAULT = // implementation defined

entity type to its name
     */**
    public * Constructs a fully-qualified entity.ClientQuotaEntity(Map<String, String> entries);

     /**
     * @param@return entriesmap mapsof entity type to its name
     */
    public QuotaEntity(Map<String, String> entries();
}

/**
 * Describes a component quota entityfor applying a client quota filter.
 */
public class QuotaFilterClientQuotaFilterComponent {

    /**
     * AConstructs and returns a filter to be appliedcomponent that exactly matches the provided entity
     * name for the entity type.
     *
     * @param entityType the entity type the filter component applies to
     * @param matchentityName the entity stringname that's matched exactly, or null if the type should be omitted
     */
    public static ClientQuotaFilterComponent QuotaFilterofEntity(String entityType, String matchentityName);
}

DescribeClientQuotas

Code Block
languagejava
public class DescribeClientQuotasOptions extends AbstractOptions<DescribeClientQuotasOptions> { /**
    // Empty.
}

/**
 *Constructs Theand resultreturns ofa thefilter {@link Admin#describeClientQuotas(Collection, DescribeClientQuotasOptions)} call.
 */
public class DescribeClientQuotasResult {

component that matches the built-in default entity name
     /**
     * Maps anfor the entity totype.
 its configured quota value(s). Note if no value is defined for a quota
     * type for that entity's config, then it is not included in the resulting value map. *
     * @param entityType the entity type the filter component applies to
     */
    public static ClientQuotaFilterComponent ofDefaultEntity(String entityType);

     /**
     * @paramConstructs entitiesand thereturns collectiona offilter entitiescomponent that matchedmatches theany filter
specified name for the
  */
   * public DescribeClientQuotasResult(KafkaFuture<Map<QuotaEntity, Map<String, Double>>> entities);

    /**entity type.
     *
     * Returns@param aentityType map fromthe quota entity totype athe futurefilter whichcomponent can beapplies used to check the status of the operation.
     */
    public KafkaFuture<Map<QuotaEntity,static Map<String, Double>>> entities(ClientQuotaFilterComponent ofEntityType(String entityType);
}

public  interface Admin extends/**
 AutoCloseable {
   * ...

@return the component's entity /**type
     */
 Describes all entities matchingpublic all provided filters (logical AND) that have at least oneString entityType();

    /**
     * quota value defined. @return the optional match string, where:
     *
       * @param filtersif filterspresent, tothe applyname tothat's matchingmatched entitiesexactly
     * @param options         if empty, matches the optionsdefault to usename
     * @return result containing all matching entities         if null, matches any specified name
     */
    DescribeClientQuotasResultpublic describeClientQuotas(Collection<QuotaFilter> filters, DescribeClientQuotasOptions options);
}

ResolveClientQuotas:

Code Block
languagejava
public class ResolveClientQuotasOptions extends AbstractOptions<ResolveClientQuotasOptions> {
    // Empty.
Optional<String> match();
}

/**
 * TheDescribes resulta ofclient thequota {@link Admin#resolveClientQuotas(Collection, ResolveClientQuotasOptions)} callentity filter.
 */
public class ResolveClientQuotasResultClientQuotaFilter {

    /**
     * A Informationfilter to aboutbe aapplied specificto quotamatching configurationclient entryquotas.
     */
    public class* Entry@param {
components the components to filter on
   /**
         * * @param sourcestrict the entity source for whether the value
filter only includes       * @param value the non-null value
 specified components
        */
        public Entry(QuotaEntity source, Double value);
    }

private ClientQuotaFilter(Collection<ClientQuotaFilterComponent> components, boolean strict);

    /**
     * InformationConstructs aboutand thereturns valuea forquota afilter quotathat type.
matches all provided components. Matching *entities
     * NOTE:with Weentity maintaintypes athat `Value`are class because additional information may be added, e.g.,not specified by a component will also be included in the result.
     *
     * @param components athe listcomponents offor overriddenthe entries.filter
     */
    public class Value {static ClientQuotaFilter contains(Collection<ClientQuotaFilterComponent> components);

        /**
     * Constructs and returns *a @paramquota entryfilter thethat quotamatches entry
all provided components. Matching entities
     */
 with entity types that are not specified publicby Value(Entry entry);
    }

    /**
     * Maps a collection of entities to their resolved quota valuesa component will *not* be included in the result.
     *
     * @param configcomponents the quotacomponents configuration for the requested entitiesfilter
     */
    public ResolveClientQuotasResult(Map<QuotaEntity, KafkaFuture<Map<String, Value>>> configstatic ClientQuotaFilter containsOnly(Collection<ClientQuotaFilterComponent> components);

    /**
     * ReturnsConstructs aand mapreturns froma quota entityfilter tothat amatches futureall which can be used to check the status of the operationconfigured entities.
     */
    public Map<QuotaEntity,static KafkaFuture<Map<String, Value>>> config(ClientQuotaFilter all();

    /**
     * Returns@return athe future which succeeds only if all quota descriptions succeed.filter's components
     */
    public KafkaFuture<Void>Collection<ClientQuotaFilterComponent> allcomponents();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * @return Resolveswhether the filter effectiveis quota values for the provided entities.strict, i.e. only includes specified components
     */
    public boolean strict();
}

/**
 * @paramDescribes entitiesa theconfiguration entitiesalteration to describe the resolved quotas for
     * @param options the options to use
     * @return the resolved quotas for the entities
     */
    ResolveClientQuotasResult resolveClientQuotas(Collection<QuotaEntity> entities, ResolveClientQuotasOptions options);
}

AlterClientQuotas

Code Block
languagejava
titleAlterQuotas
public class AlterClientQuotasEntry {
be made to a client quota entity.
 */
public class ClientQuotaAlteration {

    public static class Op {

        /**
         * @param key the quota type to alter
         * @param value if set then the existing value is updated,
         *              otherwise if null, the existing value is cleared
         */
        public Op(String key, Double value);

    }

    /**
     * @param entity the entity* whose@return configthe will be modified
quota type to alter
         */
        public String key();

        /**
 @param  ops the alteration to perform - if* value@return isif set, then the existing value is updated,
     *    *         otherwise if null, the existing value is cleared
         */
     public  AlterClientQuotasEntry(QuotaEntity entity,public Collection<Op>Double opsvalue();
    }

public   class AlterClientQuotasOptionsprivate extendsfinal AbstractOptions<AlterClientQuotasOptions>ClientQuotaEntity {entity;
    /**
private final Collection<Op> ops;

  * Sets whether the request should be validated without altering the configs. /**
     * @param entity the entity whose config will be modified
     * @param ops the alteration to perform
     */
    public AlterClientQuotasOptions validateOnly(boolean validateOnlyClientQuotaAlteration(ClientQuotaEntity entity, Collection<Op> ops);
}

    /**
   * The result* of@return the entity {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
public class AlterClientQuotasResult {
    public AlterClientQuotasResult(Map<QuotaEntity, KafkaFuture<Void>> futureswhose config will be modified
     */
    public ClientQuotaEntity entity();

    /**
     * Returns@return athe map from quota entity to a future which can be used to check the status of the operation.alteration to perform
     */
    public Map<QuotaEntity, KafkaFuture<Void>> valuesCollection<Op> ops();
}

DescribeClientQuotas

Code Block
languagejava
public class DescribeClientQuotasOptions extends AbstractOptions<DescribeClientQuotasOptions> /**{
     * Returns a future which succeeds only if all quota alterations succeed.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...
// Empty.
}

/**
 * The result of the {@link Admin#describeClientQuotas(Collection, DescribeClientQuotasOptions)} call.
 */
public class DescribeClientQuotasResult {

    /**
     * AltersMaps thean quotasentity asto specifiedits forconfigured the entries.
     *quota value(s). Note if no value is defined for a quota
     * @param alterations the alterations to performtype for that entity's config, then it is not included in the resulting value map.
     *
     * @return @param entities future for the resultcollection of entities that matched the alterationsfilter
     */
    AlterClientQuotasResultpublic alterClientQuotasDescribeClientQuotasResult(Collection<AlterClientQuotasEntry>KafkaFuture<Map<ClientQuotaEntity, entriesMap<String, AlterClientQuotasOptionsDouble>>> optionsentities);
}

kafka-client-quotas.sh/ClientQuotasCommand


    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Describes all entities matching the provided filter that have at least one client quota configuration
     * value defined.
     * <p>
     * The following exceptions can be anticipated when calling {@code get()} on the future from the
     * returned {@link DescribeClientQuotasResult}:
     * <ul>
     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
     *   If the authenticated user didn't have describe access to the cluster.</li>
     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
     *   If the request details are invalid. e.g., an invalid entity type was specified.</li>
     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *   If the request timed out before the describe could finish.</li>
     * </ul>
     * <p>
     * This operation is supported by brokers with version 2.6.0 or higher.
     *
     * @param filter the filter to apply to match entities
     * @param options the options to use
     * @return the DescribeClientQuotasResult containing the result
     */
    DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options);
}

ResolveClientQuotas:

Code Block
languagejava
public class ResolveClientQuotasOptions extends AbstractOptions<ResolveClientQuotasOptions> {
    // Empty.
}

/**
 * The result of the {@link Admin#resolveClientQuotas(Collection, ResolveClientQuotasOptions)} call.
 */
public class ResolveClientQuotasResult {
    /**
     * Information about a specific quota configuration entry.
     */
    public class Entry {
        /**
         * @param source the entity source for the value
         * @param value the non-null value
         */
        public Entry(QuotaEntity source, Double value);
    }

    /**
     * Information about the value for a quota type.
     *
     * NOTE: We maintain a `Value` class because additional information may be added, e.g.,
     *       a list of overridden entries.
     */
    public class Value {
        /**
         * @param entry the quota entry
         */
        public Value(Entry entry);
    }

    /**
     * Maps a collection of entities to their resolved quota values.
     *
     * @param config the quota configuration for the requested entities
     */
    public ResolveClientQuotasResult(Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config();

    /**
     * Returns a future which succeeds only if all quota descriptions succeed.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Resolves the effective quota values for the provided entities.
     *
     * @param entities the entities to describe the resolved quotas for
     * @param options the options to use
     * @return the resolved quotas for the entities
     */
    ResolveClientQuotasResult resolveClientQuotas(Collection<QuotaEntity> entities, ResolveClientQuotasOptions options);
}

AlterClientQuotas

Code Block
languagejava
titleAlterQuotas
/**
 * Options for {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)}.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
public class AlterClientQuotasOptions extends AbstractOptions<AlterClientQuotasOptions> {

    /**
     * Returns whether the request should be validated without altering the configs.
     */
    public boolean validateOnly();

    /**
     * Sets whether the request should be validated without altering the configs.
     */
    public AlterClientQuotasOptions validateOnly(boolean validateOnly);
}

/**
 * The result of the {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
@InterfaceStability.Evolving
public class AlterClientQuotasResult {

    /**
     * Maps an entity to its alteration result.
     *
     * @param futures maps entity to its alteration result
     */
    public AlterClientQuotasResult(Map<ClientQuotaEntity, KafkaFuture<Void>> futures);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public Map<ClientQuotaEntity, KafkaFuture<Void>> values();

    /**
     * Returns a future which succeeds only if all quota alterations succeed.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Alters client quota configurations with the specified alterations.
     * <p>
     * Alterations for a single entity are atomic, but across entities is not guaranteed. The resulting
     * per-entity error code should be evaluated to resolve the success or failure of all updates.
     * <p>
     * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
     * the returned {@link AlterClientQuotasResult}:
     * <ul>
     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
     *   If the authenticated user didn't have alter access to the cluster.</li>
     *   <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
     *   If the request details are invalid. e.g., a configuration key was specified more than once for an entity.</li>
     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *   If the request timed out before the alterations could finish. It cannot be guaranteed whether the update
     *   succeed or not.</li>
     * </ul>
     * <p>
     * This operation is supported by brokers with version 2.6.0 or higher.
     *
     * @param entries the alterations to perform
     * @return the AlterClientQuotasResult containing the result
     */
    AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
}

kafka-client-quotas.sh/ClientQuotasCommand

A ClientQuotasCommand would be constructed with an associated bin/kafkaA ClientQuotasCommand would be constructed with an associated bin/kafka-client-quotas.sh script for managing quotas via command line, and would have three modes of operation, roughly correlating to each of the API calls:

...

Code Block
$./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 --alter   \
                              --names=client-id=my-client --defaults=user \
                              --add=consumer_byte_rate=2000000            \
                              --delete=producer_byte_rate

<no output on success>

$./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 ---delete=producer_byte_rate

<no output on success>

$./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 --describe \
                     describe \
                              --names=client-id=my-client --defaults=user

{user=<default>, client-id=my-client}
consumer_byte_rate=2000000

Proposed Changes

In addition to the API changes above, the following write protocol would be implemented:

DescribeClientQuotas

Code Block
{
  "apiKey": 48,
  "type": "request",
  "name": "DescribeClientQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Components", "type": "[]ComponentData", "versions": "0+",
      "about": "Filter  --names=client-id=my-client --defaults=user

{user=<default>, client-id=my-client}
consumer_byte_rate=2000000

Proposed Changes

In addition to the API changes above, the following write protocol would be implemented:

DescribeClientQuotas

Code Block
{
  "apiKey": 48,
  "type": "request",
  "name": "DescribeClientQuotasRequest",
  "validVersionscomponents to apply to quota entities.", "fields": [
      { "name": "EntityType", "type": "string", "versions": "0+",
        "flexibleVersionsabout": "none"The entity type that the filter component applies to." },
  "fields": [
    { "name": "FilterMatchType", "type": "[]QuotaFilterDataint8", "versions": "0+",
       "about": "Filters to apply to quota entities.", "fields": [ "about": "How to match the entity {0 = exact name, 1 = default name, 2 = any specified name}." },
      { "name": "EntityTypeMatch", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The entity type thatstring to match against, or null if unused for the filtermatch applies totype." },
    ]},
    { "name": "MatchStrict", "type": "stringbool", "versions": "0+", "nullableVersions": "0+",
        "about": "TheWhether stringthe tomatch matchis against, or null if the typestrict, i.e. should beexclude omitted." }
    ]entities with unspecified entity types." }
  ]
}

{
  "apiKey": 48,
  "type": "response",
  "name": "DescribeClientQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "EntryErrorCode", "type": "[]EntryDataint16", "versions": "0+",
      "about": "AThe resulterror entry."code, "fields": [
   or `0` if the quota description succeeded." },
    { "name": "ErrorCodeErrorMessage", "type": "int16string", "versions": "0+",
 "nullableVersions": "0+",
      "about": "The error codemessage, or `0``null` if the quota description succeeded." },
      { "name": "ErrorMessageEntries", "type": "string[]EntryData", "versions": "0+", "nullableVersions": "0+",
        "about": "TheA errorresult message, or `null` if the quota description succeeded." },entry.", "fields": [
      { "name": "Entity", "type": "[]QuotaEntityDataEntityData", "versions": "0+",
        "about": "The quota entity description.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
          "about": "The entity name, or null if the default." }
      ]},
      ]},
     { "name": "Values", "type": "[]ValueData", "versions": "0+",
	"about": "The quota values for the entity.", "fields": [
        { "name": "TypeKey", "type": "string", "versions": "0+",
          "about": "The quota typeconfiguration key." },
        { "name": "Value", "type": "doublefloat64", "versions": "0+",
          "about": "The quota configuration value." }
      ]}
    ]}
  ]
}

ResolveClientQuotas

Code Block
{
  "apiKey": 4950,
  "type": "request",
  "name": "ResolveClientQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+",
      "about": "The quota entity description.", "fields": [
      { "name": "EntityType", "type": "string", "versions": "0+",
        "about": "The entity type." },
      { "name": "EntityName", "type": "string", "versions": "0+",
        "about": "The entity name." }
    ]}
  ]
}

{
  "apiKey": 4950,
  "type": "response",
  "name": "ResolveClientQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Entry", "type": "[]QuotaEntryData", "versions": "0+",
      "about": "Resolved quota entries.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the resolved quota description succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the resolved quota description succeeded." },
      { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",
        "about": "Resolved quota entries.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The entity name." }
      ]},
      { "name": "QuotaValues", "type": "[]QuotaValueData", "versions": "0+",
        "about": "Quota configuration values.", "fields": [
        { "name": "Type", "type": "string", "versions": "0+",
          "about": "The quota type." },
        { "name": "Entry", "type": "[]ValueEntryData", "versions": "0+",
          "about": "Quota value entries.", "fields": [
          { "name": "QuotaEntity", "type": "[]ValueQuotaEntity", "versions": "0+",
            "about": "Resolved quota entries.", "fields": [
            { "name": "EntityType", "type": "string", "versions": "0+",
              "about": "The entity type." },
            { "name": "EntityName", "type": "string", "versions": "0+",
              "about": "The entity name." }
          ]},
          { "name": "Value", "type": "double", "versions": "0+",
            "about": "The quota configuration value." }
        ]}
      ]}
    ]}
  ]
}

AlterClientQuotas

Code Block
{
  "apiKey": 5049,
  "type": "request",
  "name": "AlterClientQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "EntryEntries", "type": "[]EntryData", "versions": "0+",
      "about": "The quota configuration entries to alter.", "fields": [
      { "name": "QuotaEntityEntity", "type": "[]QuotaEntityEntityData", "versions": "0+",
        "about": "The quota entity to alter.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
          "about": "The name of the entity, or null if the default." }
      ]},
      { "name": "OpOps", "type": "[]OpData", "versions": "0+",
        "about": "An individual quota configuration entry to alter.", "fields": [
        { "name": "TypeKey", "type": "string", "versions": "0+",
          "about": "The quota typeconfiguration key." },
        { "name": "Value", "type": "doublefloat64", "versions": "0+",
          "about": "The value to set, otherwise ignored if the value is to be removed." },
        { "name": "Remove", "type": "bool", "versions": "0+",
          "about": "Whether the quota configuration value should be removed, otherwise set." }
      ]}
    ]},
    { "name": "ValidateOnly", "type": "bool", "versions": "0+",
      "about": "Whether the alteration should be validated, but not performed." }
  ]
}

{
  "apiKey": 5049,
  "type": "response",
  "name": "AlterClientQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "EntryEntries", "type": "[]EntryData", "versions": "0+",
      "about": "The quota configuration entries to alter.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the quota alteration succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the quota alteration succeeded." },
      { "name": "QuotaEntityEntity", "type": "[]QuotaEntityEntityData", "versions": "0+",
        "about": "The quota entity to alter.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+",
          "about": "The name of the entity, or null if the default." }
      ]}
    ]}
  ]
}

Kafka RPC 'double' support

...