THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
/** * Describes a fully-qualified entityclient quota entity, which is a mapping of entity types to their names. */ public class QuotaEntityClientQuotaEntity { /** * TypeThe type of an entity entry. */ public static final String USER = "user"; public static final String CLIENT_ID = "client-id"; /** * RepresentsConstructs thea defaultquota nameentity for the an entity, i.e. the entity that's matchedgiven types and names. If a name is null, * when an exact match isn't foundthen it is mapped to the built-in default entity name. */ * public@param staticentries finalmaps String QUOTA_ENTITY_NAME_DEFAULT = // implementation defined entity type to its name */** public * Constructs a fully-qualified entity.ClientQuotaEntity(Map<String, String> entries); /** * @param@return entriesmap mapsof entity type to its name */ public QuotaEntity(Map<String, String> entries(); } /** * Describes a component quota entityfor applying a client quota filter. */ public class QuotaFilterClientQuotaFilterComponent { /** * AConstructs and returns a filter to be appliedcomponent that exactly matches the provided entity * name for the entity type. * * @param entityType the entity type the filter component applies to * @param matchentityName the entity stringname that's matched exactly, or null if the type should be omitted */ public static ClientQuotaFilterComponent QuotaFilterofEntity(String entityType, String matchentityName); } |
DescribeClientQuotas
Code Block | ||
---|---|---|
| ||
public class DescribeClientQuotasOptions extends AbstractOptions<DescribeClientQuotasOptions> { /** // Empty. } /** *Constructs Theand resultreturns ofa thefilter {@link Admin#describeClientQuotas(Collection, DescribeClientQuotasOptions)} call. */ public class DescribeClientQuotasResult { component that matches the built-in default entity name /** * Maps anfor the entity totype. its configured quota value(s). Note if no value is defined for a quota * type for that entity's config, then it is not included in the resulting value map. * * @param entityType the entity type the filter component applies to */ public static ClientQuotaFilterComponent ofDefaultEntity(String entityType); /** * @paramConstructs entitiesand thereturns collectiona offilter entitiescomponent that matchedmatches theany filter specified name for the */ * public DescribeClientQuotasResult(KafkaFuture<Map<QuotaEntity, Map<String, Double>>> entities); /**entity type. * * Returns@param aentityType map fromthe quota entity totype athe futurefilter whichcomponent can beapplies used to check the status of the operation. */ public KafkaFuture<Map<QuotaEntity,static Map<String, Double>>> entities(ClientQuotaFilterComponent ofEntityType(String entityType); } public interface Admin extends/** AutoCloseable { * ... @return the component's entity /**type */ Describes all entities matchingpublic all provided filters (logical AND) that have at least oneString entityType(); /** * quota value defined. @return the optional match string, where: * * @param filtersif filterspresent, tothe applyname tothat's matchingmatched entitiesexactly * @param options if empty, matches the optionsdefault to usename * @return result containing all matching entities if null, matches any specified name */ DescribeClientQuotasResultpublic describeClientQuotas(Collection<QuotaFilter> filters, DescribeClientQuotasOptions options); } |
ResolveClientQuotas:
Code Block | ||
---|---|---|
| ||
public class ResolveClientQuotasOptions extends AbstractOptions<ResolveClientQuotasOptions> { // Empty. Optional<String> match(); } /** * TheDescribes resulta ofclient thequota {@link Admin#resolveClientQuotas(Collection, ResolveClientQuotasOptions)} callentity filter. */ public class ResolveClientQuotasResultClientQuotaFilter { /** * A Informationfilter to aboutbe aapplied specificto quotamatching configurationclient entryquotas. */ public class* Entry@param { components the components to filter on /** * * @param sourcestrict the entity source for whether the value filter only includes * @param value the non-null value specified components */ public Entry(QuotaEntity source, Double value); } private ClientQuotaFilter(Collection<ClientQuotaFilterComponent> components, boolean strict); /** * InformationConstructs aboutand thereturns valuea forquota afilter quotathat type. matches all provided components. Matching *entities * NOTE:with Weentity maintaintypes athat `Value`are class because additional information may be added, e.g.,not specified by a component will also be included in the result. * * @param components athe listcomponents offor overriddenthe entries.filter */ public class Value {static ClientQuotaFilter contains(Collection<ClientQuotaFilterComponent> components); /** * Constructs and returns *a @paramquota entryfilter thethat quotamatches entry all provided components. Matching entities */ with entity types that are not specified publicby Value(Entry entry); } /** * Maps a collection of entities to their resolved quota valuesa component will *not* be included in the result. * * @param configcomponents the quotacomponents configuration for the requested entitiesfilter */ public ResolveClientQuotasResult(Map<QuotaEntity, KafkaFuture<Map<String, Value>>> configstatic ClientQuotaFilter containsOnly(Collection<ClientQuotaFilterComponent> components); /** * ReturnsConstructs aand mapreturns froma quota entityfilter tothat amatches futureall which can be used to check the status of the operationconfigured entities. */ public Map<QuotaEntity,static KafkaFuture<Map<String, Value>>> config(ClientQuotaFilter all(); /** * Returns@return athe future which succeeds only if all quota descriptions succeed.filter's components */ public KafkaFuture<Void>Collection<ClientQuotaFilterComponent> allcomponents(); } public interface Admin extends AutoCloseable { ... /** * @return Resolveswhether the filter effectiveis quota values for the provided entities.strict, i.e. only includes specified components */ public boolean strict(); } /** * @paramDescribes entitiesa theconfiguration entitiesalteration to describe the resolved quotas for * @param options the options to use * @return the resolved quotas for the entities */ ResolveClientQuotasResult resolveClientQuotas(Collection<QuotaEntity> entities, ResolveClientQuotasOptions options); } |
AlterClientQuotas
Code Block | ||||
---|---|---|---|---|
| ||||
public class AlterClientQuotasEntry { be made to a client quota entity. */ public class ClientQuotaAlteration { public static class Op { /** * @param key the quota type to alter * @param value if set then the existing value is updated, * otherwise if null, the existing value is cleared */ public Op(String key, Double value); } /** * @param entity the entity* whose@return configthe will be modified quota type to alter */ public String key(); /** @param ops the alteration to perform - if* value@return isif set, then the existing value is updated, * * otherwise if null, the existing value is cleared */ public AlterClientQuotasEntry(QuotaEntity entity,public Collection<Op>Double opsvalue(); } public class AlterClientQuotasOptionsprivate extendsfinal AbstractOptions<AlterClientQuotasOptions>ClientQuotaEntity {entity; /** private final Collection<Op> ops; * Sets whether the request should be validated without altering the configs. /** * @param entity the entity whose config will be modified * @param ops the alteration to perform */ public AlterClientQuotasOptions validateOnly(boolean validateOnlyClientQuotaAlteration(ClientQuotaEntity entity, Collection<Op> ops); } /** * The result* of@return the entity {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)} call. * * The API of this class is evolving, see {@link Admin} for details. */ public class AlterClientQuotasResult { public AlterClientQuotasResult(Map<QuotaEntity, KafkaFuture<Void>> futureswhose config will be modified */ public ClientQuotaEntity entity(); /** * Returns@return athe map from quota entity to a future which can be used to check the status of the operation.alteration to perform */ public Map<QuotaEntity, KafkaFuture<Void>> valuesCollection<Op> ops(); } |
DescribeClientQuotas
Code Block | ||
---|---|---|
| ||
public class DescribeClientQuotasOptions extends AbstractOptions<DescribeClientQuotasOptions> /**{ * Returns a future which succeeds only if all quota alterations succeed. */ public KafkaFuture<Void> all(); } public interface Admin extends AutoCloseable { ... // Empty. } /** * The result of the {@link Admin#describeClientQuotas(Collection, DescribeClientQuotasOptions)} call. */ public class DescribeClientQuotasResult { /** * AltersMaps thean quotasentity asto specifiedits forconfigured the entries. *quota value(s). Note if no value is defined for a quota * @param alterations the alterations to performtype for that entity's config, then it is not included in the resulting value map. * * @return @param entities future for the resultcollection of entities that matched the alterationsfilter */ AlterClientQuotasResultpublic alterClientQuotasDescribeClientQuotasResult(Collection<AlterClientQuotasEntry>KafkaFuture<Map<ClientQuotaEntity, entriesMap<String, AlterClientQuotasOptionsDouble>>> optionsentities); } |
kafka-client-quotas.sh/ClientQuotasCommand
/**
* Returns a map from quota entity to a future which can be used to check the status of the operation.
*/
public KafkaFuture<Map<ClientQuotaEntity, Map<String, Double>>> entities();
}
public interface Admin extends AutoCloseable {
...
/**
* Describes all entities matching the provided filter that have at least one client quota configuration
* value defined.
* <p>
* The following exceptions can be anticipated when calling {@code get()} on the future from the
* returned {@link DescribeClientQuotasResult}:
* <ul>
* <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
* If the authenticated user didn't have describe access to the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
* If the request details are invalid. e.g., an invalid entity type was specified.</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* If the request timed out before the describe could finish.</li>
* </ul>
* <p>
* This operation is supported by brokers with version 2.6.0 or higher.
*
* @param filter the filter to apply to match entities
* @param options the options to use
* @return the DescribeClientQuotasResult containing the result
*/
DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options);
}
|
ResolveClientQuotas:
Code Block | ||
---|---|---|
| ||
public class ResolveClientQuotasOptions extends AbstractOptions<ResolveClientQuotasOptions> {
// Empty.
}
/**
* The result of the {@link Admin#resolveClientQuotas(Collection, ResolveClientQuotasOptions)} call.
*/
public class ResolveClientQuotasResult {
/**
* Information about a specific quota configuration entry.
*/
public class Entry {
/**
* @param source the entity source for the value
* @param value the non-null value
*/
public Entry(QuotaEntity source, Double value);
}
/**
* Information about the value for a quota type.
*
* NOTE: We maintain a `Value` class because additional information may be added, e.g.,
* a list of overridden entries.
*/
public class Value {
/**
* @param entry the quota entry
*/
public Value(Entry entry);
}
/**
* Maps a collection of entities to their resolved quota values.
*
* @param config the quota configuration for the requested entities
*/
public ResolveClientQuotasResult(Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config);
/**
* Returns a map from quota entity to a future which can be used to check the status of the operation.
*/
public Map<QuotaEntity, KafkaFuture<Map<String, Value>>> config();
/**
* Returns a future which succeeds only if all quota descriptions succeed.
*/
public KafkaFuture<Void> all();
}
public interface Admin extends AutoCloseable {
...
/**
* Resolves the effective quota values for the provided entities.
*
* @param entities the entities to describe the resolved quotas for
* @param options the options to use
* @return the resolved quotas for the entities
*/
ResolveClientQuotasResult resolveClientQuotas(Collection<QuotaEntity> entities, ResolveClientQuotasOptions options);
} |
AlterClientQuotas
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Options for {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)}.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
public class AlterClientQuotasOptions extends AbstractOptions<AlterClientQuotasOptions> {
/**
* Returns whether the request should be validated without altering the configs.
*/
public boolean validateOnly();
/**
* Sets whether the request should be validated without altering the configs.
*/
public AlterClientQuotasOptions validateOnly(boolean validateOnly);
}
/**
* The result of the {@link Admin#alterClientQuotas(Collection, AlterClientQuotasOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class AlterClientQuotasResult {
/**
* Maps an entity to its alteration result.
*
* @param futures maps entity to its alteration result
*/
public AlterClientQuotasResult(Map<ClientQuotaEntity, KafkaFuture<Void>> futures);
/**
* Returns a map from quota entity to a future which can be used to check the status of the operation.
*/
public Map<ClientQuotaEntity, KafkaFuture<Void>> values();
/**
* Returns a future which succeeds only if all quota alterations succeed.
*/
public KafkaFuture<Void> all();
}
public interface Admin extends AutoCloseable {
...
/**
* Alters client quota configurations with the specified alterations.
* <p>
* Alterations for a single entity are atomic, but across entities is not guaranteed. The resulting
* per-entity error code should be evaluated to resolve the success or failure of all updates.
* <p>
* The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
* the returned {@link AlterClientQuotasResult}:
* <ul>
* <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
* If the authenticated user didn't have alter access to the cluster.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidRequestException}
* If the request details are invalid. e.g., a configuration key was specified more than once for an entity.</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* If the request timed out before the alterations could finish. It cannot be guaranteed whether the update
* succeed or not.</li>
* </ul>
* <p>
* This operation is supported by brokers with version 2.6.0 or higher.
*
* @param entries the alterations to perform
* @return the AlterClientQuotasResult containing the result
*/
AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options);
} |
kafka-client-quotas.sh/ClientQuotasCommand
A ClientQuotasCommand
would be constructed with an associated bin/kafka
A ClientQuotasCommand
would be constructed with an associated bin/kafka-client-quotas.sh
script for managing quotas via command line, and would have three modes of operation, roughly correlating to each of the API calls:
...
Code Block |
---|
$./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 --alter \ --names=client-id=my-client --defaults=user \ --add=consumer_byte_rate=2000000 \ --delete=producer_byte_rate <no output on success> $./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 ---delete=producer_byte_rate <no output on success> $./bin/kafka-client-quotas.sh --bootstrap-server localhost:9092 --describe \ describe \ --names=client-id=my-client --defaults=user {user=<default>, client-id=my-client} consumer_byte_rate=2000000 |
Proposed Changes
In addition to the API changes above, the following write protocol would be implemented:
DescribeClientQuotas
Code Block |
---|
{ "apiKey": 48, "type": "request", "name": "DescribeClientQuotasRequest", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "Components", "type": "[]ComponentData", "versions": "0+", "about": "Filter --names=client-id=my-client --defaults=user {user=<default>, client-id=my-client} consumer_byte_rate=2000000 |
Proposed Changes
In addition to the API changes above, the following write protocol would be implemented:
DescribeClientQuotas
Code Block |
---|
{ "apiKey": 48, "type": "request", "name": "DescribeClientQuotasRequest", "validVersionscomponents to apply to quota entities.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "flexibleVersionsabout": "none"The entity type that the filter component applies to." }, "fields": [ { "name": "FilterMatchType", "type": "[]QuotaFilterDataint8", "versions": "0+", "about": "Filters to apply to quota entities.", "fields": [ "about": "How to match the entity {0 = exact name, 1 = default name, 2 = any specified name}." }, { "name": "EntityTypeMatch", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The entity type thatstring to match against, or null if unused for the filtermatch applies totype." }, ]}, { "name": "MatchStrict", "type": "stringbool", "versions": "0+", "nullableVersions": "0+", "about": "TheWhether stringthe tomatch matchis against, or null if the typestrict, i.e. should beexclude omitted." } ]entities with unspecified entity types." } ] } { "apiKey": 48, "type": "response", "name": "DescribeClientQuotasResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "EntryErrorCode", "type": "[]EntryDataint16", "versions": "0+", "about": "AThe resulterror entry."code, "fields": [ or `0` if the quota description succeeded." }, { "name": "ErrorCodeErrorMessage", "type": "int16string", "versions": "0+", "nullableVersions": "0+", "about": "The error codemessage, or `0``null` if the quota description succeeded." }, { "name": "ErrorMessageEntries", "type": "string[]EntryData", "versions": "0+", "nullableVersions": "0+", "about": "TheA errorresult message, or `null` if the quota description succeeded." },entry.", "fields": [ { "name": "Entity", "type": "[]QuotaEntityDataEntityData", "versions": "0+", "about": "The quota entity description.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The entity name, or null if the default." } ]}, ]}, { "name": "Values", "type": "[]ValueData", "versions": "0+", "about": "The quota values for the entity.", "fields": [ { "name": "TypeKey", "type": "string", "versions": "0+", "about": "The quota typeconfiguration key." }, { "name": "Value", "type": "doublefloat64", "versions": "0+", "about": "The quota configuration value." } ]} ]} ] } |
ResolveClientQuotas
Code Block |
---|
{ "apiKey": 4950, "type": "request", "name": "ResolveClientQuotasRequest", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+", "about": "The quota entity description.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The entity name." } ]} ] } { "apiKey": 4950, "type": "response", "name": "ResolveClientQuotasResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Entry", "type": "[]QuotaEntryData", "versions": "0+", "about": "Resolved quota entries.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or `0` if the resolved quota description succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The error message, or `null` if the resolved quota description succeeded." }, { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+", "about": "Resolved quota entries.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The entity name." } ]}, { "name": "QuotaValues", "type": "[]QuotaValueData", "versions": "0+", "about": "Quota configuration values.", "fields": [ { "name": "Type", "type": "string", "versions": "0+", "about": "The quota type." }, { "name": "Entry", "type": "[]ValueEntryData", "versions": "0+", "about": "Quota value entries.", "fields": [ { "name": "QuotaEntity", "type": "[]ValueQuotaEntity", "versions": "0+", "about": "Resolved quota entries.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The entity name." } ]}, { "name": "Value", "type": "double", "versions": "0+", "about": "The quota configuration value." } ]} ]} ]} ] } |
AlterClientQuotas
Code Block |
---|
{ "apiKey": 5049, "type": "request", "name": "AlterClientQuotasRequest", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "EntryEntries", "type": "[]EntryData", "versions": "0+", "about": "The quota configuration entries to alter.", "fields": [ { "name": "QuotaEntityEntity", "type": "[]QuotaEntityEntityData", "versions": "0+", "about": "The quota entity to alter.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The name of the entity, or null if the default." } ]}, { "name": "OpOps", "type": "[]OpData", "versions": "0+", "about": "An individual quota configuration entry to alter.", "fields": [ { "name": "TypeKey", "type": "string", "versions": "0+", "about": "The quota typeconfiguration key." }, { "name": "Value", "type": "doublefloat64", "versions": "0+", "about": "The value to set, otherwise ignored if the value is to be removed." }, { "name": "Remove", "type": "bool", "versions": "0+", "about": "Whether the quota configuration value should be removed, otherwise set." } ]} ]}, { "name": "ValidateOnly", "type": "bool", "versions": "0+", "about": "Whether the alteration should be validated, but not performed." } ] } { "apiKey": 5049, "type": "response", "name": "AlterClientQuotasResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "EntryEntries", "type": "[]EntryData", "versions": "0+", "about": "The quota configuration entries to alter.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or `0` if the quota alteration succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The error message, or `null` if the quota alteration succeeded." }, { "name": "QuotaEntityEntity", "type": "[]QuotaEntityEntityData", "versions": "0+", "about": "The quota entity to alter.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The name of the entity, or null if the default." } ]} ]} ] } |
Kafka RPC 'double' support
...