Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

AlterOperation (new)

  • 0: Add
  • 1: Set
  • 2: Delete

Field Types

Double

A new type needs to be added to transfer quota values. Since the protocol classes in Kafka already utilize ByteBuffers it is logical to use their functionality for serializing doubles. The serialization is basically a representation of the specified floating-point value according to the IEEE 754 floating-point "double format" bit layout.

The implementation will be defined in org.apache.kafka.common.protocol.types with the other protocol types as on 8 bytes (as in IEEE 754) and it will have no default value much like the other types available in the protocol.

 Describe Quotas

The justification for a new protocol is that a quota is quite different from a broker or topic config because a quota can sometimes be identified a simple user, client or even a (user,client) tuple while a topic or a broker config can be identified only by the topic's name or the broker's ID. Moreover quotas have their own well defined types.

...

Code Block
languagejava
titleorg.apache.kafka.clients.admin
public static class Quota {
    public QuotaType type();
    public double value();
    public QuotaSource source();
}

public enum QuotaType {
    PRODUCER_BYTE_RATE((byte) 0, "producer_byte_rate"),
    CONSUMER_BYTE_RATE((byte) 1, "consumer_byte_rate"),
    REQUEST_PERCENTAGE((byte) 2, "request_percentage");

    QuotaType(byte id, String name);
    public byte id();
    public String quotaName();
}
 
public enum AlterOperation {
    ADD((byte) 0),
    SET((byte) 1),
    DELETE((byte) 2);

    AlterOperation(byte id);
    public byte id();
}

public enum QuotaSource {
    CLIENT_OF_USER((byte) 0, "Client of user"),
    DEFAULT_CLIENT_OF_USER((byte) 1, "Default client of user"),
    USER((byte) 2, "User"),
    CLIENT_OF_DEFAULT_USER((byte) 3, "Client of default user user"),
    DEFAULT_CLIENT_OF_DEFAULT_USER((byte) 4, "Default client of default user"),
    DEFAULT_USER((byte) 5, "Default user"), CLIENT((byte) 6, "Client"),
    DEFAULT_CLIENT_OF_DEFAULT_USER((byte) 47, "Default client of default user"),;

    DEFAULT_USERQuotaSource((byte) 5id, "DefaultString user"), CLIENT((byte) 6, "Client"),
    DEFAULT_CLIENT((byte) 7, "Default client");

    QuotaSource(byte id, String description);
    public byte id();description);
    public byte id();
    public String description();
}
 
/**
 * Makes sure that the list of resources that is used as key in a hashmap is immutable and has a fixed implementation for the hashCode.
 */
public class ConfigResourceList {
    public StringList<ConfigResource> descriptiongetResourceList();
}
 
public class AdminClient {
    public DescribeQuotasResult describeQuotas(Map<List<ConfigResource>Map<ConfigResourceList, Collection<QuotaType>>, DescribeQuotasOptions options);
    public AlterQuotasResult alterQuotas(Map<List<ConfigResource>Map<ConfigResourceList, Collection<Quota>> configs, AlterQuotasOptions options);
}
public class DescribeQuotasOptions { 
    public DescribeQuotasOptions timeoutMs(Integer timeout);
}

public class DescribeQuotasResult {
    public Map<List<Resource>, KafkaFuture<Collection<Quota>>> values();
}
 
public class AlterQuotasOptions { 
    public AlterQuotasOptions timeoutMs(Integer timeout);
	public AlterQuotasOptions validateOnly(boolean validateOnly);
}

public class AlterQuotasResult {
    public Map<List<Resource>, KafkaFuture<Void>> results();
}

...

Code Block
languagejava
titleorg.apache.kafka.common.requests
public class QuotaCollection {
	public QuotaCollection(ApiError error, Collection<Quota> entries);

    public QuotaCollection(Collection<Quota> entries);

    public ApiError error();
    public Collection<Quota> entries();
}
 
public class DescribeQuotasRequest extends AbstractRequest {
	
	public static Schema[] schemaVersions();
	public static DescribeQuotasRequest parse(ByteBuffer buffer, short version);
 
	public static class Builder extends AbstractRequest.Builder {
		public Builder(Map<List<Resource>, Collection<QuotaType>> quotaSettings);
		public DescribeQuotasRequest build(short version);
	}
 
	public DescribeQuotasRequest(short version, Map<List<Resource>, Collection<QuotaType>> quotaSettings);
	public DescribeQuotasRequest(Struct struct, short version);
 
	public Map<List<Resource>, Collection<QuotaType>> quotaTypes();
}

public class DescribeQuotasResponse extends AbstractResponse {
	public static Schema[] schemaVersions();

	public DescribeQuotasResponse(int throttleTimeMs, Map<List<ConfigResource>Map<ConfigResourceList, KafkaFuture<Collection<Quota>>>);
	public DescribeQuotasResponse(Struct struct);

	public Map<List<Resource>, QuotaCollection> quotas();
}
 
public class AlterQuotasRequest extends AbstractRequest {
	public static Schema[] schemaVersions();
 
	public static class Builder extends AbstractRequest.Builder {
		public Builder(Map<List<Resource>, QuotaCollection> quotaSettings);
		public DescribeQuotasRequest build(short version);
	}
 
	public AlterQuotasRequest(short version, Map<List<Resource>, QuotaCollection> quotas, boolean validateOnly);
	public AlterQuotasRequest(Struct struct, short version);
 
	public Map<List<Resource>, QuotaCollection> quotas();
}
 
public class AlterQuotasResponse extends AbstractResponse {
	public static Schema[] schemaVersions();
 
	public AlterQuotasRequest(short version, Map<List<Resource>, ApiError> quotas, boolean validateOnly);
	public AlterQuotasRequest(Struct struct, short version);
 
	public Map<List<Resource>, ApiError> errors();
    public int throttleTimeMs();
}

...

Finally, backward compatibilty (for instance a 1.1 client wants to admin a 1.0 server) will be impacted as some of the protocols are newly created and doesn't exist in old servers. In these cases users should continue using the scala version of the ConfigCommand by putting the core jar on their classpath and defining the USE_OLD_KAFKA_CONFIG_COMMAND=true environment variable. This variable will set the main class to the old command in the config and invokes that. This way the environment variable ensures that users who aren't able to use the new command currently would need to make minimal changes in order to continue using it. 

...

From the compatibility point of view there might be a bigger impact as mentioned above. Since the command now uses the wire protocols (including some newly introduced ones) the backward compatibility will be impacted. That means that a user can't use a 1.1 client to administrate a 1.0 broker as in the older broker some of the wire protocols don't exist. This again should be acceptable most of the users as most of the admin commands require the core jar on the classpath which means that most of the time the commands are executed from an environment with the same version of the brokers. In the remaining cases users will have to change to use kafka-run-class or the USE_OLD_KAFKA_CONFIG_COMMAND environment variable.

...

To ease the migration for users who are stuck with this command, the USE_OLD_KAFKA_CONFIG_COMMAND will be introduced.

Special Migration Tools

...