Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/**
 * Describes a fully-qualified entity.
 */
public class QuotaEntity {
    /**
     * Type of an entity entry.
     */
    public static final String USER = "user";
    public static final String CLIENT_ID = "client-id";

    /**
     * Represents the default name for an entity, i.e. the entity that's matched
     * when an exact match isn't found.
     */
    public static final String QUOTA_ENTITY_NAME_DEFAULT = // implementation defined

    /**
     * `entries` describes the fully-qualified entity. The key is a {@code Type} string, however
     * there may also exist keys that are not enumerated by {@code Type} that still apply, e.g.
     * the server may internally associate another type. When querying entities, it's necessary
     * to return all quota types because quota values for these types may influence the effectiveresolved
     * quota value. However, when altering a quota, any types that aren't specified must be able
     * to be inferred by the server, otherwise an error is returned.
     *
     * For example, {("CLIENT_ID" -> "test-client"),
     *               ("USER" -> "test-user"),
     *               ("GROUP" -> "internal-group")}.
     */
    public QuotaEntity(Map<String, String> entries);
}

/**
 * Describes a quota entity filter.
 */
public class QuotaFilter {
    /**
     * A filter to be applied.
     *
     * @param entityType the entity type the filter applies to
     * @param match the non-null string that's matched exactly
     */
    public QuotaFilter(String entityType, String match);
}

...

Code Block
languagejava
public class ResolveClientQuotasOptions extends AbstractOptions<ResolveClientQuotasOptions> {

    /**
     * Whether to exclude the list of overridden values for every quota type in the result.
     */
    public ResolveClientQuotasOptions setOmitOverriddenValues(boolean omitOverriddenValues);/ Empty.
}

/**
 * The result of the {@link Admin#ResolveClientQuotas(Collection<QuotaEntity>, ResolveClientQuotasOptions)} call.
 */
public class ResolveClientQuotasResult {
    /**
     * Information about a specific quota configuration entry.
     */
    public class Entry {
        /**
         * @param source the entity source for the value
         * @param value the non-null value
         */
        public Entry(QuotaEntity source, Long value);
    }

    /**
     * Information about the value for a quota type.
     *
     * NOTE: We maintain a `Value` class because additional information may be added, e.g.,
     *       a list of overridden entries.
     */
    public class Value {
        /**
         * @param entry the quota entry
         */
        public Value(Entry entry);
    }

    /**
     * Maps a collection of entities to their effectiveresolved quota values.
     *
     * @param config the quota configuration for the requested entities
     */
    public ResolveClientQuotasResult(Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config();

    /**
     * Returns a future which succeeds only if all quota descriptions succeed.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Describes the effectiveresolved quotas for the provided entities.
     *
     * @param entities the entities to describe the effectiveresolved quotas for
     * @param options the options to use
     * @return the effectiveresolved quotas for the entities
     */
    ResolveClientQuotasResult resolveClientQuotas(Collection<QuotaEntity> entities, ResolveClientQuotasOptions options);
}

...

A ClientQuotasCommand would be constructed with an associated bin/kafka-client-quotas.sh script for managing quotas via command line, and would have three modes of operation, roughly correlating to each of the API calls:

  1. ListDescribe: Lists Describes the quota entities for the given entity specification and their corresponding quota values, as explicitly specified in the configuration. The user may provide explicit entity types+names, or a pattern to apply to an entity type find matching entity names. If an entity type is omitted from the input, it is treated as a wildcard.
  2. DescribeResolve: Describes Resolves the effective quotas for an entity, including contextual information about how those quotas were derived. This includes what configuration entries matched to the entity and, if requested, the overridden, less-specific matches for the entity.
  3. Alter: Modifies a quota configuration entry in an incremental manner, i.e. specify which entries to add, update, and/or remove.

...

Operations (mutually exclusive):
--listdescribe: Lists Describes the entities that match the given specification, and prints out their configuration values.
--describeresolve: Describes  Resolves the effective quota values for an entity.
--alter: Alters the configuration for the given specification.

...

In general, the output of the entities will be of the form: {entity-type=entity-name, ...}, where entity-name is sanitized for output since it is an opaque string. When displaying configuration values, the form: quota-name=quota-value.When describing overridden entries in the ResolveClientQuotas, the entry is prefixed by a * to denote it's not effective. 

Describe:

Code Block
$./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 --describe \
                              --names=client-id=my-client

{user=user-one, client-id=my-client}
consumer_byte_rate=4000000
producer_byte_rate=1000000

{user=user-two, client-id=my-client}
producer_byte_rate=2000000

{user=<default>, client-id=my-client}
consumer_byte_rate=1000000
producer_byte_rate=500000

...

Code Block
{
  "apiKey": 49,
  "type": "request",
  "name": "ResolveClientQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+",
      "about": "The quota entity description.", "fields": [
      { "name": "EntityType", "type": "string", "versions": "0+",
        "about": "The entity type." },
      { "name": "EntityName", "type": "string", "versions": "0+",
        "about": "The entity name." }
    ]},
  ]
}

{
  "apiKey": 49,
  "type": "response",
  "name": "ResolveClientQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Entry", "type": "[]QuotaEntryData", "versions": "0+",
      "about": "EffectiveResolved quota entries.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the effectiveresolved quota description succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the effectiveresolved quota description succeeded." },
      { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",
        "about": "EffectiveResolved quota entries.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The entity name." }
      ]},
      { "name": "QuotaValues", "type": "[]QuotaValueData", "versions": "0+",
        "about": "Quota configuration values.", "fields": [
        { "name": "Type", "type": "string", "versions": "0+",
          "about": "The quota type." },
        { "name": "Entry", "type": "[]ValueEntryData", "versions": "0+",
          "about": "Quota value entries.", "fields": [
          { "name": "QuotaEntity", "type": "[]ValueQuotaEntity", "versions": "0+",
            "about": "EffectiveResolved quota entries.", "fields": [
            { "name": "EntityType", "type": "string", "versions": "0+",
              "about": "The entity type." },
            { "name": "EntityName", "type": "string", "versions": "0+",
              "about": "The entity name." }
          ]},
          { "name": "Value", "type": "int64", "versions": "0+",
            "about": "The quota configuration value." }
        ]}
      ]}
    ]}
  ]
}

...

  • Use existing describeConfigs/incrementalAlterConfigs for quota functionality. This falls short for a couple reasons. First, quotas entity names are more dynamic than brokers and tasks which makes them awkward to fit into generic tools which expect a single unique, distinct key, e.g. ConfigCommand. Second, there's no tool that expresses a way to get the effective quota resolved quota for an entity without some heavy engineering on the client side, which lacks extensibility and is more expensive to perform, especially over large collection of entities. Therefore, it makes sense to approach quotas as a standalone set of APIs that provide more targeted information and can properly support future extensibility.

...