Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As represented by the current ZK node structure, the order in which quotas are matched are as follows. Note , in order of highest priority to lowest (note <user> is a specified user principal, <client-id> is a specified client ID, and <default> is a special default user/client ID that matches to all users or clients IDs.):

        /config/users/<user>/clients/<client-id>
        /config/users/<user>/clients/<default>
        /config/users/<user>
        /config/users/<default>/clients/<client-id>
        /config/users/<default>/clients/<default>
        /config/users/<default>
        /config/clients/<client-id>
        /config/clients/<default>

...

Altering quotas only works on a config-centric manner, and therefore doesn't need distinguishing. For a given entity match, the administrator should be able to specify which quotas apply, or alternatively remove existing quotas so they no longer match. This is AlterQuotas.

Units

This KIP introduces the concept of a quota unit to be applied to a quota value. Currently, only a single unit is used for quotas: bytes-per-second, however this has limitations to effective quota management. For example, since it's a global throughput value, it doesn't scale well as brokers are added or removed, and so a broker-bytes-per-second unit could be added to better manage this behavior. As units are added, the possible quota configuration entries becomes the cross product of the quota types by the quota units, which means that it'd be possible to specify bytes-per-second both on a global and per-broker basis, and quota enforcement would occur at whichever limit was hit first. Additional considerations could be made for a fair-share system, where units of shares could be configured for quotas, and when bandwidth is contested, the share count of the active entities could be used to determine their restricted throughput.

It's beyond the scope of this KIP to add new units and implement their corresponding functionality in the broker, however it must be noted for future extensibility of the APIs.

Types Rationale

While there's two defined entity types in AK, a server-side plugin mechanism allows for further expansion. Likewise, as use cases evolve, finer-grained quota control may be necessary. Therefore, entity types should not be statically bound to publicly defined constants, and instead the API should support flexible entity types by interpreting them as a String identifier. Any entity types that the broker doesn't understand should throw an IllegalArgumentException back to the client.

The quota types (producer byte rate, consumer byte rate, etc.) and units should also be given the same consideration. The possible quota applications may expand in the future, and the API shouldn't lock in which quota types are accessible. Modification of quota types/units that are unknown should also fail with error.

Since a fixed set of entity types aren't defined, an entity should be represented by a Map<String, String>, which maps an entity type to the entity name.

Public Interfaces

Admin client calls will be added to correspond to describeQuotas, describeEffectiveQuotas, and alterQuotas, with supporting types defined in the common.quotas package.

Common types in package org.apache.kafka.common.quota:

Types Rationale

While there's two defined entity types in AK, a server-side plugin mechanism allows for further expansion. Likewise, as use cases evolve, finer-grained quota control may be necessary. Therefore, entity types should not be statically bound to publicly defined constants, and instead the API should support flexible entity types by interpreting them as a String identifier. Any entity types that the broker doesn't understand should throw an IllegalArgumentException back to the client.

The quota types (producer byte rate, consumer byte rate, etc.) and units should also be given the same consideration. The possible quota applications may expand in the future, and the API shouldn't lock in which quota types are accessible. Modification of quota types/units that are unknown should also fail with error.

Since a fixed set of entity types aren't defined, an entity should be represented by a Map<String, String>, which maps an entity type to the entity name.

Public Interfaces

Admin client calls will be added to correspond to describeQuotas, describeEffectiveQuotas, and alterQuotas, with supporting types defined in the common.quotas package.


Common types in package org.apache.kafka.common.quota:

Code Block
languagejava
/**
 * Describes a fully-qualified entity.
 */
public class QuotaEntity {
Code Block
languagejava
/**
 * Describes a fully-qualified entity.
 */
public class QuotaEntity {
    /**
     * Type of an entity entry.
     */
    public enum Type {
        USER,
        CLIENT_ID;
    }

    /**
     * Represents the default name for an entity, i.e. the entity that's matched
     * when an exact match isn't found.
     */
    public final static String QUOTA_ENTITY_NAME_DEFAULT = // implementation defined

    /**
     * `entries`Type describesof thean fully-qualifiedentity entityentry. The key is a {@code Type} string, however
     * Note there may alsobe existadditional keystypes, thatthese are not enumerated by {@code Type} that still apply, e.g defined for convenience.
     */
    thepublic serverstatic mayfinal internallyString associateUSER another= type...;
 When querying entities, it's necessary
     * to return all quota types because quota values for these types may influence the effective
     * quota value. However, when altering a quota, any types that aren't specified must be able
     * to be inferred by the server, otherwise an error is returned.public static final String CLIENT_ID = ...;

    /**
     * Represents the default name for an entity, i.e. the entity that's matched
     * when an exact match isn't found.
     */
    public final static String QUOTA_ENTITY_NAME_DEFAULT = ...;

    /**
     *
 `entries` describes the fully-qualified entity. The *key Foris example,a {("CLIENT_ID" -> "test-client"),@code Type} string, however
     *   there may also exist keys that are not enumerated by {@code  ("USER" -> "test-user"),Type} that still apply, e.g.
     * the server may internally associate another type. When querying entities, it's necessary
     * ("GROUP" -> "internal-group")}.
     */
    public QuotaEntity(Map<String, String> entries);
}

/**
 * Describes a quota key.
 */
public class QuotaKey {
    /**
     * The quota types.to return all quota types because quota values for these types may influence the effective
     * quota value. However, when altering a quota, any types that aren't specified must be able
     */
 to be inferred by publicthe enumserver, Typeotherwise {
an error is returned.
     CONSUMER_BYTE_RATE,*
     * For  PRODUCER_BYTE_RATE,
   example, {("CLIENT_ID" -> "test-client"),
     REQUEST_PERCENTAGE;
*    }

    /**
     * The units for a quota value. Note there may be multiple units for a given quota type
("USER" -> "test-user"),
     *             * that influences quota behavior("GROUP" -> "internal-group")}.
     */
    public enum Units {
        RATE_BPS;
    }

QuotaEntity(Map<String, String> entries);
}

/**
 * Describes quota types.
 */
public class QuotaType {
    /**
     * @paramThe type theof quota type to apply. Note there may be additional types, these are defined for convenience.
     */
 @param   unitspublic thestatic unitsfinal for the quota type
     */String CONSUMER_BYTE_RATE = ...;
    public static final String PRODUCER_BYTE_RATE = ...;
    public QuotaKey(Type type, Units units)static final String REQUEST_PERCENTAGE = ...;
}

/**
 * Describes a quota entity filter.
 */
public class QuotaFilter {
    public enumstatic Rulefinal {
        EXACT,    // exact name match
        PREFIX;   // matches all names with the given prefix
    }String RULE_EXACT = ...;

    /**
     * A filtering rule to be applied.
     *
     * @param entityType the entity type the rule applies to
     * @param rule the rule to apply
     * @param match the non-null string that's applied by the rule
     */
    public QuotaFilter(QuotaEntity.Type entityType, RuleString rule, String match);
}


DescribeQuotas:

Code Block
languagejava
public class DescribeQuotasOptions extends AbstractOptions<DescribeQuotasOptions> {
    // Empty.
}

/**
 * The result of the {@link Admin#describeQuotas(Collection<QuotaFilter>, DescribeQuotasOptions)} call.
 */
public class DescribeQuotasResult {

    /**
     * Maps an entity to its configured quota value(s). Note if no value is defined for a quota
     * type for that entity's config, then it is not included in the resulting value map.
     *
     * @param entities the collection of entities that matched the filter
     */
    public DescribeQuotasResult(KafkaFuture<Map<QuotaEntity, Map<QuotaKeyMap<String, Long>>> entities);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public KafkaFuture<Map<QuotaEntity, Map<QuotaKeyMap<String, Long>>> entities();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Describes all entities matching all provided filters (logical AND) that have at least one
     * quota value defined.
     *
     * @param filters filtering rules to apply to matching entities
     * @param options the options to use
     * @return result containing all matching entities
     */
    DescribeQuotasResult describeQuotas(Collection<QuotaFilter> filters, DescribeQuotasOptions options);
}

...

Code Block
languagejava
public class DescribeEffectiveQuotasOptions extends AbstractOptions<DescribeEffectiveQuotasOptions> {

    /**
     * Whether to exclude the list of overridden values for every quota type in the result.
     */
    public DescribeEffectiveQuotasOptions setOmitOverriddenValues(boolean omitOverriddenValues);
}

/**
 * The result of the {@link Admin#describeEffectiveQuotas(Collection<QuotaEntity>, DescribeEffectiveQuotasOptions)} call.
 */
public class DescribeEffectiveQuotasResult {
    /**
     * Information about a specific quota configuration entry.
     */
    public class Entry {
        /**
         * @param source the entity source for the value
         * @param value the non-null value
         */
        public Entry(QuotaEntity source, Long value);
    }

    /**
     * Information about the value for a quota type.
     */
    public class Value {
        /**
         * @param entry the quota entry
         * @param overriddenEntries all values that are overridden due to being lower in
         *                          specificity, or null if not requested
         */
        public Value(Entry entry, List<Entry> overriddenEntries);
    }

    /**
     * Maps a collection of entities to their effective quota values.
     *
     * @param config the quota configuration for the requested entities
     */
    public DescribeEffectiveQuotasResult(Map<QuotaEntity, KafkaFuture<Map<QuotaKeyKafkaFuture<Map<String, Value>>> config);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public Map<QuotaEntity, KafkaFuture<Map<QuotaKeyKafkaFuture<Map<String, Value>>> config();

    /**
     * Returns a future which succeeds only if all quota descriptions succeed.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Describes the effective quotas for the provided entities.
     *
     * @param entities the entities to describe the effective quotas for
     * @param options the options to use
     * @return the effective quotas for the entities
     */
    DescribeEffectiveQuotasResult describeEffectiveQuotas(Collection<QuotaEntity> entities, DescribeEffectiveQuotasOptions options);
}

...

Code Block
languagejava
titleAlterQuotas
public class AlterQuotasEntry {
    public class Op {
        /**
         * @param key the quota type and units to alter
         * @param value if set then the existing value is updated,
         *              otherwise if null, the existing value is cleared
         */
        public Op(QuotaKeyString keytype, Long value);
    }

    /**
     * @param entity the entity whose config will be modified
     * @param ops the alteration to perform - if value is set, then the existing value is updated,
     *            otherwise if null, the existing value is cleared
     */
    public AlterQuotasEntry(QuotaEntity entity, Collection<Op> ops);
}

public class AlterQuotasOptions extends AbstractOptions<AlterQuotasOptions> {
    /**
     * Sets whether the request should be validated without altering the configs.
     */
    public AlterQuotasOptions validateOnly(boolean validateOnly);
}

/**
 * The result of the {@link Admin#alterQuotas(Collection<AlterQuotasEntry>, AlterQuotasOptions)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
public class AlterQuotasResult {
    public AlterQuotasResult(Map<QuotaEntity, KafkaFuture<Void>> futures);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public Map<QuotaEntity, KafkaFuture<Void>> values();

    /**
     * Returns a future which succeeds only if all quota alterations succeed.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Alters the quotas as specified for the entries.
     *
     * @param alterations the alterations to perform
     * @return the result of the alterations
     */
    AlterQuotasResult alterQuotas(Collection<AlterQuotasEntry> entries, AlterQuotasOptions options);
}

...

Exclusive to --describe:
--includeshow-overridesoverridden: Whether to include show overridden config entries.

...

Code Block
$./bin/kafka-quotas.sh --bootstrap-server localhost:9092 --list \
                       --names=client-id=my-client

{user=user-two, client-id=my-client}
consumer_byte_rate:_shares=200
producer_byte_rate:bps=10000000

{user=user-one, client-id=my-client}
producer_byte_rate:_broker_bps=2000000

{user=<default>, client-id=my-client}
consumer_byte_rate:_shares=100
producer_byte_rate:_broker_bps=500000

$./bin/kafka-quotas.sh --bootstrap-server localhost:9092 --list \
                       --prefix=user=user-

{user=user-two, client-id=my-client}
consumer_byte_rate:_shares=200
producer_byte_rate:bps=10000000

{user=user-one, client-id=my-client}
producer_byte_rate:_broker_bps=2000000

Describe:

Code Block
$./bin/kafka-quotas.sh --bootstrap-server localhost:9092 --describe \
                       --names=user=user-one,client-id=my-client

consumer_byte_rate:_shares=200 {user=user-one, client-id=my-client}
producer_byte_rate:bps=10000000 {user=user-one, client-id=my-client}
producer_byte_rate:_broker_bps=500000 {user=<default>, client-id=my-client}

$./bin/kafka-quotas.sh --bootstrap-server localhost:9092 --describe \
                       --names=user=user-two,client-id=my-client    \
                       --includeshow-overridesoverridden

consumer_byte_rate:_shares=100 {user=<default>, client-id=my-client}
producer_byte_rate:_broker_bps=2000000 {user=user-two, client-id=my-client}
*producer_byte_rate:_broker_bps=500000 {user=<default>, client-id=my-client}

...

Code Block
$./bin/kafka-quotas.sh --bootstrap-server localhost:9092 --describe \
                       --names=client-id=my-client --defaults=user  \
                       --add=producer_byte_rate:_shares=100          \
                       --delete=producer_byte_rate:_broker_bps

<no output on success>

$./bin/kafka-quotas.sh --bootstrap-server localhost:9092 --list     \
                       --names=client-id=my-client --defaults=user

{user=<default>, client-id=my-client}
consumer_byte_rate:_shares=100
producer_byte_rate:_shares=100


Proposed Changes

...

Code Block
{
  "apiKey": 48,
  "type": "request",
  "name": "DescribeQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Filter", "type": "[]QuotaFilterData", "versions": "0+",
      "about": "Filters to apply to quota entities.", "fields": [
      { "name": "EntityType", "type": "string", "versions": "0+",
        "about": "The entity type that the filter applies to." },
      { "name": "Rule", "type": "string", "versions": "0+",
        "about": "The rule the filter performs." },
      { "name": "Match", "type": "string", "versions": "0+",
        "about": "The string to apply the rule against." }
    ]}
  ]
}

{
  "apiKey": 48,
  "type": "response",
  "name": "DescribeQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Entry", "type": "[]EntryData", "versions": "0+",
      "about": "A result entry.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the quota description succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the quota description succeeded." },
      { "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+",
        "about": "The quota entity description.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The entity name." }
      ]." },
        { "name": "TypeEntityName", "type": "string", "versions": "0+",
          "about": "The quotaentity typename." }
      ]},
      { "name": "UnitsType", "type": "string", "versions": "0+",
        "about": "The units for the valuequota type." },
      { "name": "Value", "type": "int64", "versions": "0+",
        "about": "The quota value." }
    ]}
  ]
}

...

Code Block
{
  "apiKey": 49,
  "type": "request",
  "name": "DescribeEffectiveQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+",
      "about": "The quota entity description.", "fields": [
      { "name": "EntityType", "type": "string", "versions": "0+",
        "about": "The entity type." },
      { "name": "EntityName", "type": "string", "versions": "0+",
        "about": "The entity name." }
    ]},
    { "name": "OmitOverriddenValues", "type": "bool", "versions": "0+",
      "about": "Whether to exclude the list of overridden values for every quota type." }
  ]
}

{
  "apiKey": 49,
  "type": "response",
  "name": "DescribeEffectiveQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Entry", "type": "[]QuotaEntryData", "versions": "0+",
      "about": "Effective quota entries.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the effective quota description succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the effective quota description succeeded." },
      { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",
        "about": "Effective quota entries.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The entity name." }
      ]},
      { "name": "QuotaValues", "type": "[]QuotaValueData", "versions": "0+",
        "about": "Quota configuration values.", "fields": [
        { "name": "Type", "type": "stringQuotaValueData", "versions": "0+",
          "about": "TheQuota quotaconfiguration typevalues.", "fields": },[
        { "name": "UnitsType", "type": "string", "versions": "0+",
          "about": "The units for the quota type." },
        { "name": "Entry", "type": "[]ValueEntryData", "versions": "0+",
          "about": "Quota value entries.", "fields": [
          { "name": "QuotaEntity", "type": "[]ValueQuotaEntity", "versions": "0+",
            "about": "Effective quota entries.", "fields": [
            { "name": "EntityType", "type": "string", "versions": "0+",
              "about": "The entity type." },
            { "name": "EntityName", "type": "string", "versions": "0+",
              "about": "The entity name." }
          ]},
          { "name": "Value", "type": "int64", "versions": "0+",
            "about": "The quota configuration value." }
        ]}
      ]}
    ]}
  ]
}

...

Code Block
{
  "apiKey": 50,
  "type": "request",
  "name": "AlterQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Entry", "type": "[]EntryData", "versions": "0+",
      "about": "The quota configuration entries to alter.", "fields": [
      { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",
        "about": "The quota entity to alter.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The name of the entity." }
      ]},
      { "name": "Op", "type": "[]OpData", "versions": "0+",
        "about": "An individual quota configuration entry to alter.", "fields": [
        { "name": "Type", "type": "string", "versions": "0+",
          "about": "The quota type." },
        { "name": "Units", "type": "string", "versions": "0+",
          "about": "The units for the quota type." },
        { "name": "Value", "type": "int64", "versions": "0+",
          "about": "The value to set, otherwise ignored if the value is to be removed." },
        { "name": "Remove", "type": "bool", "versions": "0+",
          "about": "Whether the quota configuration value should be removed, otherwise set." }
      ]}
    ]},
    { "name": "ValidateOnly", "type": "bool", "versions": "0+",
      "about": "Whether the alteration should be validated, but not performed." }
  ]
}

{
  "apiKey": 50,
  "type": "response",
  "name": "AlterQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Entry", "type": "[]EntryData", "versions": "0+",
      "about": "The quota configuration entries to alter.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the quota alteration succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the quota alteration succeeded." },
      { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",
        "about": "The quota entity to alter.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The name of the entity." }
      ]}
    ]}
  ]
}

...