...
Code Block | ||||
---|---|---|---|---|
| ||||
public class AlterQuotasEntry { public class Op { /** * @param key the quota type and units to alter * @param value if set then the existing value is updated, * otherwise if null, the existing value is cleared */ public Op(QuotaKey key, Long value); } /** * @param entity the entity whose config will be modified * @param ops the alteration to perform - if value is set, then the existing value is updated, * otherwise if null, the existing value is cleared */ public AlterQuotasEntry(QuotaEntity entity, Collection<Op> ops); } public class AlterQuotasOptions extends AbstractOptions<AlterQuotasOptions> { /** * Sets whether the request should be validated without altering the configs. */ public AlterQuotasOptions validateOnly(boolean validateOnly); } /** * The result of the {@link Admin#alterQuotas(Collection<AlterQuotasEntry>, AlterQuotasOptions)} call. * * The API of this class is evolving, see {@link Admin} for details. */ public class AlterQuotasResult { public AlterQuotasResult(Map<QuotaEntity, KafkaFuture<Void>> futures); /** * Returns a map from quota entity to a future which can be used to check the status of the operation. */ public Map<QuotaEntity, KafkaFuture<Void>> values(); /** * Returns a future which succeeds only if all quota alterations succeed. */ public KafkaFuture<Void> all(); } public interface Admin extends AutoCloseable { ... /** * Alters the quotas as specified for the entries. * * @param alterations the alterations to perform * @return the result of the alterations */ AlterQuotasResult alterQuotas(Collection<AlterQuotasEntry> entries, AlterQuotasOptions options); } |
kafka-quotas.sh/QuotasCommand:
A QuotasCommand would be constructed with an associated script for managing quotas via command line.
Flags:
--bootstrap-server (required). The standard bootstrap server.
--describe
Proposed Changes
In addition to the API changes above, the following write protocol would be implemented:
DescribeQuotas:
Code Block | language | yml
---|
{ "apiKey": 48, "type": "request", "name": "DescribeQuotasRequest", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "Filter", "type": "[]QuotaFilterData", "versions": "0+", "about": "Filters to apply to quota entities.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type that the filter applies to." }, { "name": "Rule", "type": "string", "versions": "0+", "about": "The rule the filter performs." }, { "name": "Match", "type": "string", "versions": "0+", "about": "The string to apply the rule against." } ]} ] } { "apiKey": 48, "type": "response", "name": "DescribeQuotasResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Entry", "type": "[]EntryData", "versions": "0+", "about": "A result entry.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or `0` if the quota description succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The error message, or `null` if the quota description succeeded." }, { "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+", "about": "The quota entity description.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The entity name." } ]}, { "name": "Type", "type": "string", "versions": "0+", "about": "The quota type." }, { "name": "Units", "type": "string", "versions": "0+", "about": "The units for the value." }, { "name": "Value", "type": "int64", "versions": "0+", "about": "The quota value." } ]} ] } |
...
Code Block |
---|
{ "apiKey": 50, "type": "request", "name": "AlterQuotasRequest", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "Entry", "type": "[]EntryData", "versions": "0+", "about": "The quota configuration entries to alter.", "fields": [ { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+", "about": "The quota entity to alter.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The name of the entity." } ]}, { "name": "Op", "type": "[]OpData", "versions": "0+", "about": "An individual quota configuration entry to alter.", "fields": [ { "name": "Type", "type": "string", "versions": "0+", "about": "The quota type." }, { "name": "Units", "type": "string", "versions": "0+", "about": "The units for the quota type." }, { "name": "Value", "type": "int64", "versions": "0+", "about": "The value to set, otherwise ignored if the value is to be removed." }, { "name": "Remove", "type": "bool", "versions": "0+", "about": "Whether the quota configuration value should be removed, otherwise set." } ]} ]}, { "name": "ValidateOnly", "type": "bool", "versions": "0+", "about": "Whether the alteration should be validated, but not performed." } ] } { "apiKey": 50, "type": "response", "name": "AlterQuotasResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Entry", "type": "[]EntryData", "versions": "0+", "about": "The quota configuration entries to alter.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or `0` if the quota alteration succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The error message, or `null` if the quota alteration succeeded." }, { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+", "about": "The quota entity to alter.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The name of the entity." } ]} ]} ] } |
kafka-configs.sh/ConfigCommand:
The ConfigCommand would be modified to extend its current quotas functionality to support the --bootstrap-server flagTODO: ConfigCommand changes.
Compatibility, Deprecation, and Migration Plan
...