Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleAlterQuotas
public class AlterQuotasEntry {
    public class Op {
        /**
         * @param key the quota type and units to alter
         * @param value if set then the existing value is updated,
         *              otherwise if null, the existing value is cleared
         */
        public Op(QuotaKey key, Long value);
    }

    /**
     * @param entity the entity whose config will be modified
     * @param ops the alteration to perform - if value is set, then the existing value is updated,
     *            otherwise if null, the existing value is cleared
     */
    public AlterQuotasEntry(QuotaEntity entity, Collection<Op> ops);
}

public class AlterQuotasOptions extends AbstractOptions<AlterQuotasOptions> {
    /**
     * Sets whether the request should be validated without altering the configs.
     */
    public AlterQuotasOptions validateOnly(boolean validateOnly);
}

/**
 * The result of the {@link Admin#alterQuotas(Collection<AlterQuotasEntry>, AlterQuotasOptions)} call.
 *
 * The API of this class is evolving, see {@link Admin} for details.
 */
public class AlterQuotasResult {
    public AlterQuotasResult(Map<QuotaEntity, KafkaFuture<Void>> futures);

    /**
     * Returns a map from quota entity to a future which can be used to check the status of the operation.
     */
    public Map<QuotaEntity, KafkaFuture<Void>> values();

    /**
     * Returns a future which succeeds only if all quota alterations succeed.
     */
    public KafkaFuture<Void> all();
}

public interface Admin extends AutoCloseable {
    ...

    /**
     * Alters the quotas as specified for the entries.
     *
     * @param alterations the alterations to perform
     * @return the result of the alterations
     */
    AlterQuotasResult alterQuotas(Collection<AlterQuotasEntry> entries, AlterQuotasOptions options);
}


kafka-quotas.sh/QuotasCommand:

A QuotasCommand would be constructed with an associated script for managing quotas via command line.

Flags:
--bootstrap-server (required). The standard bootstrap server.
--describe 


Proposed Changes

In addition to the API changes above, the following write protocol would be implemented:

DescribeQuotas:

yml
Code Block
language
{
  "apiKey": 48,
  "type": "request",
  "name": "DescribeQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Filter", "type": "[]QuotaFilterData", "versions": "0+",
      "about": "Filters to apply to quota entities.", "fields": [
      { "name": "EntityType", "type": "string", "versions": "0+",
        "about": "The entity type that the filter applies to." },
      { "name": "Rule", "type": "string", "versions": "0+",
        "about": "The rule the filter performs." },
      { "name": "Match", "type": "string", "versions": "0+",
        "about": "The string to apply the rule against." }
    ]}
  ]
}

{
  "apiKey": 48,
  "type": "response",
  "name": "DescribeQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Entry", "type": "[]EntryData", "versions": "0+",
      "about": "A result entry.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the quota description succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the quota description succeeded." },
      { "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+",
        "about": "The quota entity description.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The entity name." }
      ]},
      { "name": "Type", "type": "string", "versions": "0+",
        "about": "The quota type." },
      { "name": "Units", "type": "string", "versions": "0+",
        "about": "The units for the value." },
      { "name": "Value", "type": "int64", "versions": "0+",
        "about": "The quota value." }
    ]}
  ]
}

...

Code Block
{
  "apiKey": 50,
  "type": "request",
  "name": "AlterQuotasRequest",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "Entry", "type": "[]EntryData", "versions": "0+",
      "about": "The quota configuration entries to alter.", "fields": [
      { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",
        "about": "The quota entity to alter.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The name of the entity." }
      ]},
      { "name": "Op", "type": "[]OpData", "versions": "0+",
        "about": "An individual quota configuration entry to alter.", "fields": [
        { "name": "Type", "type": "string", "versions": "0+",
          "about": "The quota type." },
        { "name": "Units", "type": "string", "versions": "0+",
          "about": "The units for the quota type." },
        { "name": "Value", "type": "int64", "versions": "0+",
          "about": "The value to set, otherwise ignored if the value is to be removed." },
        { "name": "Remove", "type": "bool", "versions": "0+",
          "about": "Whether the quota configuration value should be removed, otherwise set." }
      ]}
    ]},
    { "name": "ValidateOnly", "type": "bool", "versions": "0+",
      "about": "Whether the alteration should be validated, but not performed." }
  ]
}

{
  "apiKey": 50,
  "type": "response",
  "name": "AlterQuotasResponse",
  "validVersions": "0",
  "flexibleVersions": "none",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Entry", "type": "[]EntryData", "versions": "0+",
      "about": "The quota configuration entries to alter.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code, or `0` if the quota alteration succeeded." },
      { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error message, or `null` if the quota alteration succeeded." },
      { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+",
        "about": "The quota entity to alter.", "fields": [
        { "name": "EntityType", "type": "string", "versions": "0+",
          "about": "The entity type." },
        { "name": "EntityName", "type": "string", "versions": "0+",
          "about": "The name of the entity." }
      ]}
    ]}
  ]
}


kafka-configs.sh/ConfigCommand:

The ConfigCommand would be modified to extend its current quotas functionality to support the --bootstrap-server flagTODO: ConfigCommand changes.


Compatibility, Deprecation, and Migration Plan

...