Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • 0: Unknown
  • 1: Any
  • 2: Topic
  • 3: Group
  • 4: Broker
  • 5: User (new)
  • 6: Client (new)

QuotaType (new)

  • 0: ProducerByteRate
  • 1: ConsumerByteRate
  • 2: RequestPercentage

QuotaSource (new)

  • 0: Self
  • 1: Default
  • 2: Parent

Describe Quotas

To be able to implement the use cases of kafka-configs.sh where a quota is modified, like user, client or (user,client) we have to create a protocol to handle quota listings. The justification for a new protocol is that a quota is quite different from a broker or topic config because a quota can sometimes be identified a simple user, client or even a (user,client) tuple while a topic or a broker config can be identified only by the topic's name or the broker's ID.

Code Block
languagejs
titleDescribeQuotas Request
DescribeQuotas Request (Version: 1) => [resource]
  resource => [quota_config_resource] [configquota_nametype]
    quota_config_resource => type name
      type => INT8
      name => STRING
    configquota_nametype => STRINGINT8

Request semantics:

  1. Can be sent to any broker
  2. If the name is <default> empty it means that listing the default quota is asked. Responses will be returned the same way for defaults.
  3. If the configthe quota_nametype array is nullis empty, all configs quotas are returned. Otherwise, configs quotas with the provided names types are returned.
  4. Authorization:  "DescribeQuotas" can only be interpreted on the "Cluster" resource and represented by the DescribeConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.

...

Code Block
languagejs
titleDescribeQuotas Response
DescribeQuotas Response (Version: 1) => throttle_time_ms [resource]
  throttle_time_ms => INT32
  resource => [quota_config_resource] [configquota]
    quota_config_resource => type name
      type => INT8
      name => STRING
    configquota => error_code error_message [configquota_entry]
      error_code => INT16
      error_message => NULLABLE_STRING
      configquota_entry =>
        config_name => STRING
        config_value => STRING quota_type quota_value quota_source
        readquota_onlytype => BOOLEANINT8
        isquota_defaultvalue => BOOLEANDOUBLE
        isquota_sensitivesource => BOOLEANINT8

Alter Quotas

Code Block
languagejs
titleAlterQuotas Request
AlterQuota Request (Version: 0) => validate_only [resource]
  validate_only => BOOLEAN
  resource => [quota_config_resource] [configquota]
    quota_config_resource => type name
      type => INT8
      name => STRING
    configquota => configquota_nametype configquota_value
      configquota_nametype => STRINGINT8
      configquota_value => STRINGDOUBLE

Request Semantics

  1. Can be sent to any broker
  2. If name is <default> it empty it means that altering a default quota is asked.
  3. If an Alter operation is attempted on a read-only config, an InvalidRequestException error will be returned for the relevant resource.
  4. AuthorizationAuthorization:  "AlterQuotas" can only be interpreted on the "Cluster" resource and represented by the AlterConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
  5. For tools that allow users to alter quota configs, a validation/dry-run mode where validation errors are reported but no creation is attempted is available via the validate_only parameter.

...

Code Block
languagejs
titleAlterQuotas Response
AlterQuotas Response (Version: 0) => throttle_time_ms [resource]
  throttle_time_ms => INT32
  resource => [quota_config_resource] [configquota]
    quota_config_resource => type name
      type => INT8
      name => STRING
    configquota => error_code error_message quota_type
      error_code => INT16
      error_message => NULLABLE_STRING
      configquota_nametype => STRINGINT8

AdminClient APIs

Code Block
languagejava
titleorg.apache.kafka.clients.admin
public class AdminClient {
    public DescribeQuotasResult describeQuotas(String userId, String clientId, Collection<String> configs, final DescribeQuotasOptions options);
    public AlterQuotasResult alterQuotas(Map<QuotaEntityTuple, Config> configs, AlterQuotasOptions options);
}
public class DescribeQuotasOptions { /**
 * Represents a list of Resource objects that have a hierarchical relationship.
 * For instance one could represent relationship like "clientA of user1".
 */
public class ResourceList {
    public DescribeQuotasOptions timeoutMs(Integer timeout);
}

public class DescribeQuotasResult {
    public Map<QuotaConfigResourceTuple, KafkaFuture<Config>> values();
}
 
public class AlterQuotasOptions { 
    public AlterQuotasOptions timeoutMs(Integer timeout);
	public AlterQuotasOptions validateOnly(boolean validateOnly);
}

public class AlterQuotasResult { ResourceList(String user, String client);
    public ResourceList(ResourceType type, String resourceName);
 
    /**
     * Returns the list of resources in a top to bottom (first to last) order.
     */
    public Map<QuotaConfigResourceTuple, KafkaFuture<Void>> results();
}

Request API

List<Resource> resources();
}
 
/**
 * This class acts as an alias for a HashMap that maps a list of Resource
 * objects which represents hierarchical
 *
public class QuotaResourceMap extends HashMap<ResourceList, T> {
    public QuotaResourceMap(ResourceList key, T value);
    public QuotaResourceMap(Map<ResourceList, T>);
}
 
public class AdminClient {
    public DescribeQuotasResult describeQuotas(QuotaResourceMap<Collection<QuotaType>>, DescribeQuotasOptions options);
    public AlterQuotasResult alterQuotas(QuotaResourceMap<Collection<Quota>> configs, AlterQuotasOptions options);
}
public class DescribeQuotasOptions { 
    public DescribeQuotasOptions timeoutMs(Integer timeout);
}

public class DescribeQuotasResult {
    public QuotaResourceMap<KafkaFuture<Collection<Quota>>> values();
}
 
public class AlterQuotasOptions { 
    public AlterQuotasOptions timeoutMs(Integer timeout);
	public AlterQuotasOptions validateOnly(boolean validateOnly);
}

public class AlterQuotasResult {
    public QuotaResourceMap<KafkaFuture<Void>> results();
}

Request API

Code Block
languagejava
titleorg.apache.kafka.common.requests
public class QuotaList {
	public QuotaList(ApiError error, Collection<Quota> entries);

    public QuotaList(Collection<Quota> entries);

    public ApiError error();
    public Collection<Quota> entries();
}
 
public static class Quota {
    public QuotaType type();
    public double value();
    public QuotaSource source();
}

public static enum QuotaType {
    PRDOUCER_BYTE_RATE(0), CONSUMER_BYTE_RATE(1), REQUEST_PERCENTAGE(2);
	
    QuotaType(byte id);
    
    public byte id();
}

public static enum QuotaSource {
    SELF(0), DEFAULT(1), PARENT(2);
 
	QuotaSource(byte id);
 
    public byte id();
}
 
public class DescribeQuotasRequest extends AbstractRequest {
	
	public static Schema[] schemaVersions();
	public static DescribeQuotasRequest parse(ByteBuffer buffer, short version);
	public static class Builder extends AbstractRequest.Builder {
		public Builder(Map<List<Resource>>, Collection<QuotaType>> quotaSettings);
		public DescribeQuotasRequest build(short version);
	}
 
	public DescribeQuotasRequest(short version, Map<List<Resource>>, Collection<QuotaType>> quotaSettings);
	public DescribeQuotasRequest(Struct struct
Code Block
languagejava
titleorg.apache.kafka.common.requests
public class DescribeQuotasRequest extends AbstractRequest {
	
	public static Schema[] schemaVersions();
	public static DescribeQuotasRequest parse(ByteBuffer buffer, short version);
 
	public static Map<List<Resource>>, Collection<QuotaType>> quotaSettings();
}

public class BuilderDescribeQuotasResponse extends AbstractRequest.Builder {
		public Builder(Map<QuotaConfigResourceTuple, Collection<String>> quotaConfigSettings);
	AbstractResponse {
	public DescribeQuotasRequest build(short version);
	}
 static Schema[] schemaVersions();

	public DescribeQuotasRequestDescribeQuotasResponse(shortint versionthrottleTimeMs, Map<QuotaConfigResourceTupleMap<List<Resource>>, Collection<String>>QuotaType> quotaConfigSettingsconfigs);
	public DescribeQuotasRequestDescribeQuotasResponse(Struct struct, short version);
 
	public Map<QuotaConfigResourceTupleMap<List<Resource>>, Collection<String>>DescribeConfigsResponse.Config> quotaConfigSettings();
}
 
public class DescribeQuotasResponseAlterQuotasRequest extends AbstractResponseAbstractRequest {
	public static Schema[] schemaVersions();
 
	public DescribeQuotasResponseAlterQuotasRequest(intshort throttleTimeMsversion, Map<QuotaConfigResourceTupleMap<List<Resource>>, DescribeConfigsResponseAlterConfigsRequest.Config> configs, boolean validateOnly);
	public DescribeQuotasResponseAlterQuotasRequest(Struct struct, short version);
 
	public Map<QuotaConfigResourceTupleMap<List<Resource>>, DescribeConfigsResponse.Config> quotaConfigSettingsconfigs();
}

 

New Command Line Interface

...