Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • 0: Self
  • 1: Default
  • 2: Parent

Describe Quotas

To be able to implement the use cases of kafka-configs.sh where a quota is modified, like user, client or (user,client) we have to create a protocol to handle quota listings. The justification for a new protocol is that a quota is quite different from a broker or topic config because a quota can sometimes be identified a simple user, client or even a (user,client) tuple while a topic or a broker config can be identified only by the topic's name or the broker's ID. Moreover quotas have their own well defined types.

Code Block
languagejs
titleDescribeQuotas Request
DescribeQuotas Request (Version: 1) => [resource]
  resource => [quota_resource] [quota_type]
    quota_resource => type name
      type => INT8
      name => STRING
    quota_type => INT8

...

Code Block
languagejs
titleAlterQuotas Response
AlterQuotas Response (Version: 0) => throttle_time_ms [resource]
  throttle_time_ms => INT32
  resource => [quota_resource] [quota]
    quota_resource => type name
      type => INT8
      name => STRING
    quota => error_code error_message quota_type
      error_code => INT16
      error_message => NULLABLE_STRING
      quota_type => INT8

DescribeConfigs and AlterConfigs

To enable describing and altering SCRAM credentials we will use the DescribeConfigs and AlterConfigs protocols. There are no changes in the protocol's structure but we will allow the USER resource type to be passed in the protocol. When this happens, the server will know that SCRAM configs are asked and will send them in the response.  In case of AlterConfigs if a USER resource type is passed it will validate if there are only SCRAM credentials are changed. If not, then will fail with InvalidRequestException.

AdminClient APIs

Code Block
languagejava
titleorg.apache.kafka.clients.admin
public static class Quota {
    public QuotaType type();
    public double value();
    public QuotaSource source();
}

public static enum QuotaType {
    PRDOUCER_BYTE_RATE(0), CONSUMER_BYTE_RATE(1), REQUEST_PERCENTAGE(2);

    QuotaType(byte id);

    public byte id();
}

public static enum QuotaSource {
    SELF(0), DEFAULT(1), PARENT(2);
 
	QuotaSource(byte id);
 
    public byte id();
}
 
/**
 * Represents a list of Resource objects that have a hierarchical relationship.
 * For instance one could represent relationship like "clientA of user1".
 */
public class ResourceList {
    public ResourceList(String user, String client);
    public ResourceList(ResourceType type, String resourceName);
 
    /**
     * Returns the list of resources in a top to bottom (first to last) order.
     */
    public List<Resource> resources();
}
 
/**
 * A helper class that makes sure that a given collection of Quota objects
 * are immutably stored and can be accessed.
 */
public class QuotaCollection {
    public QuotaCollection(Collection<Quota> entries);
 
    public Collection<Quota> entries();
}
 
/**
 * This class acts as an alias for a HashMap that maps a list of Resource
 * objects which represents hierarchical
 */
public class QuotaResourceMap extends HashMap<ResourceList, T> {
    public QuotaResourceMap(ResourceList key, T value);
    public QuotaResourceMap(Map<ResourceList, T>);
}
 
public class AdminClient {
    public DescribeQuotasResult describeQuotas(QuotaResourceMap<Collection<QuotaType>>, DescribeQuotasOptions options);
    public AlterQuotasResult alterQuotas(QuotaResourceMap<QuotaCollection> configs, AlterQuotasOptions options);
}
public class DescribeQuotasOptions { 
    public DescribeQuotasOptions timeoutMs(Integer timeout);
}

public class DescribeQuotasResult {
    public QuotaResourceMap<KafkaFuture<QuotaCollection>> values();
}
 
public class AlterQuotasOptions { 
    public AlterQuotasOptions timeoutMs(Integer timeout);
	public AlterQuotasOptions validateOnly(boolean validateOnly);
}

public class AlterQuotasResult {
    public QuotaResourceMap<KafkaFuture<Void>> results();
}

...