Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: add/set/delete operations

...

  • 0: ClientInUser
  • 1: DefaultClientInUser
  • 2: User
  • 3: ClientInDefaultUser
  • 4: DefaultClientInDefaultUser
  • 5: DefaultUser
  • 6: Client
  • 7: DefaultClient

AdminOperation (new)

  • 0: Add
  • 1: Set
  • 2: Delete

Describe Quotas

The justification for a new protocol is that a quota is quite different from a broker or topic config because a quota can sometimes be identified a simple user, client or even a (user,client) tuple while a topic or a broker config can be identified only by the topic's name or the broker's ID. Moreover quotas have their own well defined types.

Code Block
languagejs
titleDescribeQuotas Request
DescribeQuotas Request (Version: 10) => [resource]
  resource => [quota_resource] [quota_type]
    quota_resource => type name
      type => INT8
      name => STRING
    quota_type => INT8

...

Code Block
languagejs
titleDescribeQuotas Response
DescribeQuotas Response (Version: 10) => throttle_time_ms [resource]
  throttle_time_ms => INT32
  resource => [quota_resource] [quota]
    quota_resource => type name
      type => INT8
      name => STRING
    quota_collection => error_code error_message [quota_entry]
      error_code => INT16
      error_message => NULLABLE_STRING
      quota_entry => quota_type quota_value quota_source
        quota_type => INT8
        quota_value => DOUBLE
        quota_source => INT8

...

Code Block
languagejs
titleAlterQuotas Request
AlterQuota Request (Version: 0) => validate_only [resource]
  validate_only => BOOLEAN
  resource => [quota_resource] [quota]
    quota_resource => type name
      type => INT8
      name => STRING
    quota => quota_type quota_value quota_operation
      quota_type => INT8
      quota_value => DOUBLE
      quota_operation => INT8

Request Semantics

  1. Can be sent to any broker
  2. If name is empty it means that altering a default quota is asked.
  3. Authorization:  "AlterQuotas" can only be interpreted on the "Cluster" resource and represented by the AlterConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
  4. For tools that allow users to alter quota configs, a validation/dry-run mode where validation errors are reported but no creation is attempted is available via the validate_only parameter.

...

To enable describing and altering SCRAM credentials we will use the DescribeConfigs and AlterConfigs protocols. There are no changes in the protocol's structure but we will allow the USER resource type to be passed in the protocol. When this happens, the server will know that SCRAM configs are asked and will send them in the response.  In case of AlterConfigs if a USER resource type is passed it will validate if there are only SCRAM credentials are changed. If not, then will fail with InvalidRequestException.

...

AlterConfigs

 

Code Block
languagejava
titleorg.apache.kafka.clients.adminAlterConfigs Request
AlterConfigs Request (Version: 1) => [resources] validate_only
  validate_only => BOOLEAN
  resources => resource_type resource_name [configs]
    resource_type => INT8
    resource_name => STRING
    configs => config_name config_value config_operation
      config_name => STRING
      config_value => STRING
      config_operation => INT8                                           <= new addition

 

AdminClient APIs

Code Block
languagejava
titleorg.apache.kafka.clients.admin
public static class Quota {
    public QuotaType public static class Quota {
    public QuotaType type();
    public double value();
    public QuotaSource source();
}

public enum QuotaType {
    PRODUCER_BYTE_RATE((byte) 0, "producer_byte_rate"),
    CONSUMER_BYTE_RATE((byte) 1, "consumer_byte_rate"),
    REQUEST_PERCENTAGE((byte) 2, "request_percentage");

    QuotaType(byte id, String name);
    public byte id();
    public String quotaName();
}
 
public enum AdminOperation {
    ADD((byte) 0),
    SET((byte) 1),
    DELETE((byte) 2);

    QuotaType(byte id);
    public byte id();
}

public enum QuotaSource {
    CLIENT_OF_USER((byte) 0, "Client of user"),
    DEFAULT_CLIENT_OF_USER((byte) 1, "Default client of user"),
    USER((byte) 2, "User"),
    CLIENT_OF_DEFAULT_USER((byte) 3, "Client of default user"),
    DEFAULT_CLIENT_OF_DEFAULT_USER((byte) 4, "Default client of default user"),
    DEFAULT_USER((byte) 5, "Default user"), CLIENT((byte) 6, "Client"),
    DEFAULT_CLIENT((byte) 7, "Default client");

    QuotaSource(byte id, String description);
    public byte id();
    public String description();
}
 
public class AdminClient {
    public DescribeQuotasResult describeQuotas(Map<List<ConfigResource>, Collection<QuotaType>>, DescribeQuotasOptions options);
    public AlterQuotasResult alterQuotas(Map<List<ConfigResource>, Collection<Quota>> configs, AlterQuotasOptions options);
}
public class DescribeQuotasOptions { 
    public DescribeQuotasOptions timeoutMs(Integer timeout);
}

public class DescribeQuotasResult {
    public Map<List<Resource>, KafkaFuture<Collection<Quota>>> values();
}
 
public class AlterQuotasOptions { 
    public AlterQuotasOptions timeoutMs(Integer timeout);
	public AlterQuotasOptions validateOnly(boolean validateOnly);
}

public class AlterQuotasResult {
    public Map<List<Resource>, KafkaFuture<Void>> results();
}

...