...
Admin client calls will be added to correspond to DescribeClientQuotas
, ResolveClientQuotas
, and AlterClientQuotas
, with supporting types defined in the common.quotas package.
Common types in package org.apache.kafka.common.quota
...
Code Block | ||
---|---|---|
| ||
/** * Describes a fully-qualified entity. */ public class QuotaEntity { /** * Type of an entity entry. */ public static final String USER = "user"; public static final String CLIENT_ID = "client-id"; /** * Represents the default name for an entity, i.e. the entity that's matched * when an exact match isn't found. */ public static final String QUOTA_ENTITY_NAME_DEFAULT = // implementation defined /** * `entries` describes the fully-qualified entity. The key is a {@code Type} string, however * there may also exist keys that are not enumerated by {@code Type} that still apply, e.g. * the server may internally associate another type. When querying entities, it's necessary * to return all quota types because quota values for these types may influence the resolved * quota value. However, when altering a quota, any types that aren't specified must be able * to be inferred by the server, otherwise an error is returned. * * For example, {("CLIENT_ID" -> "test-client"), * ("USER" -> "test-user"), * ("GROUP" -> "internal-group")}. */ public QuotaEntity(Map<String, String> entries); } /** * Describes a quota entity filter. */ public class QuotaFilter { /** * A filter to be applied. * * @param entityType the entity type the filter applies to * @param match the non-null string that's matched exactly */ public QuotaFilter(String entityType, String match); } |
DescribeClientQuotas
...
Code Block | ||
---|---|---|
| ||
public class DescribeClientQuotasOptions extends AbstractOptions<DescribeClientQuotasOptions> { // Empty. } /** * The result of the {@link Admin#DescribeClientQuotas(Collection<QuotaFilter>, DescribeClientQuotasOptions)} call. */ public class DescribeClientQuotasResult { /** * Maps an entity to its configured quota value(s). Note if no value is defined for a quota * type for that entity's config, then it is not included in the resulting value map. * * @param entities the collection of entities that matched the filter */ public DescribeClientQuotasResult(KafkaFuture<Map<QuotaEntity, Map<String, Double>>> entities); /** * Returns a map from quota entity to a future which can be used to check the status of the operation. */ public KafkaFuture<Map<QuotaEntity, Map<String, Double>>> entities(); } public interface Admin extends AutoCloseable { ... /** * Describes all entities matching all provided filters (logical AND) that have at least one * quota value defined. * * @param filters filters to apply to matching entities * @param options the options to use * @return result containing all matching entities */ DescribeClientQuotasResult describeClientQuotas(Collection<QuotaFilter> filters, DescribeClientQuotasOptions options); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class AlterClientQuotasEntry { public class Op { /** * @param key the quota type to alter * @param value if set then the existing value is updated, * otherwise if null, the existing value is cleared */ public Op(String key, Double value); } /** * @param entity the entity whose config will be modified * @param ops the alteration to perform - if value is set, then the existing value is updated, * otherwise if null, the existing value is cleared */ public AlterClientQuotasEntry(QuotaEntity entity, Collection<Op> ops); } public class AlterClientQuotasOptions extends AbstractOptions<AlterClientQuotasOptions> { /** * Sets whether the request should be validated without altering the configs. */ public AlterClientQuotasOptions validateOnly(boolean validateOnly); } /** * The result of the {@link Admin#AlterClientQuotas(Collection<AlterClientQuotasEntry>, AlterClientQuotasOptions)} call. * * The API of this class is evolving, see {@link Admin} for details. */ public class AlterClientQuotasResult { public AlterClientQuotasResult(Map<QuotaEntity, KafkaFuture<Void>> futures); /** * Returns a map from quota entity to a future which can be used to check the status of the operation. */ public Map<QuotaEntity, KafkaFuture<Void>> values(); /** * Returns a future which succeeds only if all quota alterations succeed. */ public KafkaFuture<Void> all(); } public interface Admin extends AutoCloseable { ... /** * Alters the quotas as specified for the entries. * * @param alterations the alterations to perform * @return the result of the alterations */ AlterClientQuotasResult alterClientQuotas(Collection<AlterClientQuotasEntry> entries, AlterClientQuotasOptions options); } |
kafka-client-quotas.sh/ClientQuotasCommand
...
A ClientQuotasCommand
would be constructed with an associated bin/kafka-client-quotas.sh
script for managing quotas via command line, and would have three modes of operation, roughly correlating to each of the API calls:
...
In addition to the API changes above, the following write protocol would be implemented:
DescribeClientQuotas
...
Code Block |
---|
{ "apiKey": 48, "type": "request", "name": "DescribeClientQuotasRequest", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "Filter", "type": "[]QuotaFilterData", "versions": "0+", "about": "Filters to apply to quota entities.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type that the filter applies to." }, { "name": "Match", "type": "string", "versions": "0+", "about": "The string to match against." } ]} ] } { "apiKey": 48, "type": "response", "name": "DescribeClientQuotasResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Entry", "type": "[]EntryData", "versions": "0+", "about": "A result entry.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or `0` if the quota description succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The error message, or `null` if the quota description succeeded." }, { "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+", "about": "The quota entity description.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The entity name." } ]}, { "name": "Type", "type": "string", "versions": "0+", "about": "The quota type." }, { "name": "Value", "type": "double", "versions": "0+", "about": "The quota value." } ]} ] } |
ResolveClientQuotas
...
Code Block |
---|
{ "apiKey": 49, "type": "request", "name": "ResolveClientQuotasRequest", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "Entity", "type": "[]QuotaEntityData", "versions": "0+", "about": "The quota entity description.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The entity name." } ]}, ] } { "apiKey": 49, "type": "response", "name": "ResolveClientQuotasResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Entry", "type": "[]QuotaEntryData", "versions": "0+", "about": "Resolved quota entries.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or `0` if the resolved quota description succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The error message, or `null` if the resolved quota description succeeded." }, { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+", "about": "Resolved quota entries.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The entity name." } ]}, { "name": "QuotaValues", "type": "[]QuotaValueData", "versions": "0+", "about": "Quota configuration values.", "fields": [ { "name": "Type", "type": "string", "versions": "0+", "about": "The quota type." }, { "name": "Entry", "type": "[]ValueEntryData", "versions": "0+", "about": "Quota value entries.", "fields": [ { "name": "QuotaEntity", "type": "[]ValueQuotaEntity", "versions": "0+", "about": "Resolved quota entries.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The entity name." } ]}, { "name": "Value", "type": "double", "versions": "0+", "about": "The quota configuration value." } ]} ]} ]} ] } |
AlterClientQuotas
...
Code Block |
---|
{ "apiKey": 50, "type": "request", "name": "AlterClientQuotasRequest", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "Entry", "type": "[]EntryData", "versions": "0+", "about": "The quota configuration entries to alter.", "fields": [ { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+", "about": "The quota entity to alter.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The name of the entity." } ]}, { "name": "Op", "type": "[]OpData", "versions": "0+", "about": "An individual quota configuration entry to alter.", "fields": [ { "name": "Type", "type": "string", "versions": "0+", "about": "The quota type." }, { "name": "Value", "type": "double", "versions": "0+", "about": "The value to set, otherwise ignored if the value is to be removed." }, { "name": "Remove", "type": "bool", "versions": "0+", "about": "Whether the quota configuration value should be removed, otherwise set." } ]} ]}, { "name": "ValidateOnly", "type": "bool", "versions": "0+", "about": "Whether the alteration should be validated, but not performed." } ] } { "apiKey": 50, "type": "response", "name": "AlterClientQuotasResponse", "validVersions": "0", "flexibleVersions": "none", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Entry", "type": "[]EntryData", "versions": "0+", "about": "The quota configuration entries to alter.", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or `0` if the quota alteration succeeded." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The error message, or `null` if the quota alteration succeeded." }, { "name": "QuotaEntity", "type": "[]QuotaEntity", "versions": "0+", "about": "The quota entity to alter.", "fields": [ { "name": "EntityType", "type": "string", "versions": "0+", "about": "The entity type." }, { "name": "EntityName", "type": "string", "versions": "0+", "about": "The name of the entity." } ]} ]} ] } |
Kafka RPC 'double' support
...
Note that, while the ByteBuffer
natively supports serializing a Double
, the format in which the value is serialized is not strongly specified, so the preference is to explicitly ensure a standard representation of double-precision 64-bit IEEE 754 format. This is achieved in Java using Double.doubleToRawLongBits()
and Double.longBitsToDouble()
and should be easily portable to other languages.
...