This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here
JIRA: KAFKA-5722
Planned release: 1.1.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Describe the problems you are trying to solve.
- KIP-4 defines the high level motivation for using the admin client and KAFKA-3268 with its subtasks acts as the umbrella JIRA for this KIP.
- The current implementation of the
ConfigCommand
(kafka.admin.ConfigCommand
) which is used bykafka-configs.sh
connects to Zookeeper directly. This prevents the tool from being used in deployments where only the brokers are exposed to clients (i.e. where the zookeeper servers are intentionally not exposed). - There is a general push to refactor/rewrite/replace tools which need Zookeeper access with equivalents which use the
AdminClient
API. Thus it is necessary to change theConfigCommand
so that it no longer talks to Zookeeper directly, but via an intermediating broker. - Makes the ConfigCommand transparent to the authorization mechanism therefore enables higher level of security. The broker will be able to authorize config admin requests.
- The
AdminClient
API currently lacks any functionality for changing broker, user and client configurations which is possible with the current Zookeeper basedConfigCommand
implementation. - Changing the ConfigCommand will increase the user experience for KIP-226 for the above mentioned reasons.
Public Interfaces
Command Line Tools And Arguments
The options accepted by kafka-configs.sh
command will change:
--zookeeper
will be deprecated, which means it will display a warning message saying that it's ignored.--bootstrap-server
option will be added: it has a parameter which is an endpoint to a broker (or a comma separated list of brokers)--adminclient.config
option will be added: a file path to an admin client configuration properties file--adminclient-property
option will be added: a comma separated list of configurations ('k1=v1,k2=v2') for the admin client
Protocol Changes
KIP-133 introduced the describe and alter admin protocols and KIP-140 a wire format representation for ResourceType. We will modify these to accommodate the new requirements.
Wire Format Types
ResourceType
0: Unknown
1: Any
2: Topic
3: Group
4: Broker
5: User (new)
6: Client (new)
QuotaType (new)
0: ProducerByteRate
1: ConsumerByteRate
2: RequestPercentage
QuotaSource (new)
0: ClientInUser
1: DefaultClientInUser
2: User
3: ClientInDefaultUser
4: DefaultClientInDefaultUser
5: DefaultUser
6: Client
7: DefaultClient
Describe Quotas
The justification for a new protocol is that a quota is quite different from a broker or topic config because a quota can sometimes be identified a simple user, client or even a (user,client) tuple while a topic or a broker config can be identified only by the topic's name or the broker's ID. Moreover quotas have their own well defined types.
DescribeQuotas Request (Version: 1) => [resource] resource => [quota_resource] [quota_type] quota_resource => type name type => INT8 name => STRING quota_type => INT8
Request semantics:
- Can be sent to any broker
- If the
name
is empty it means that listing the default quota is asked. Responses will be returned the same way for defaults. - If the
quota_type
array is empty, all quotas are returned. Otherwise, quotas with the provided types are returned. - Authorization: "DescribeQuotas" can only be interpreted on the "Cluster" resource and represented by the DescribeConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
DescribeQuotas Response (Version: 1) => throttle_time_ms [resource] throttle_time_ms => INT32 resource => [quota_resource] [quota] quota_resource => type name type => INT8 name => STRING quota => error_code error_message [quota_entry] error_code => INT16 error_message => NULLABLE_STRING quota_entry => quota_type quota_value quota_source quota_type => INT8 quota_value => DOUBLE quota_source => INT8
Alter Quotas
AlterQuota Request (Version: 0) => validate_only [resource] validate_only => BOOLEAN resource => [quota_resource] [quota] quota_resource => type name type => INT8 name => STRING quota => quota_type quota_value quota_type => INT8 quota_value => DOUBLE
Request Semantics
- Can be sent to any broker
- If
name
is empty it means that altering a default quota is asked. - Authorization: "AlterQuotas" can only be interpreted on the "Cluster" resource and represented by the AlterConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
- For tools that allow users to alter quota configs, a validation/dry-run mode where validation errors are reported but no creation is attempted is available via the
validate_only
parameter.
AlterQuotas Response (Version: 0) => throttle_time_ms [resource] throttle_time_ms => INT32 resource => [quota_resource] [quota] quota_resource => type name type => INT8 name => STRING quota => error_code error_message quota_type error_code => INT16 error_message => NULLABLE_STRING quota_type => INT8
DescribeConfigs and AlterConfigs (SCRAM)
To enable describing and altering SCRAM credentials we will use the DescribeConfigs and AlterConfigs protocols. There are no changes in the protocol's structure but we will allow the USER resource type to be passed in the protocol. When this happens, the server will know that SCRAM configs are asked and will send them in the response. In case of AlterConfigs if a USER resource type is passed it will validate if there are only SCRAM credentials are changed. If not, then will fail with InvalidRequestException
.
AdminClient APIs
public static class Quota { public QuotaType type(); public double value(); public QuotaSource source(); } public static enum QuotaType { PRDOUCER_BYTE_RATE(0), CONSUMER_BYTE_RATE(1), REQUEST_PERCENTAGE(2); QuotaType(byte id); public byte id(); } public static enum QuotaSource { CLIENT_IN_USER(0), DEFAULT_CLIENT_IN_USER(1), USER(2), CLIENT_IN_DEFAULT_USER(3), DEFAULT_CLIENT_IN_DEFAULT_USER(4), DEFAULT_USER(5), CLIENT(6), DEFAULT_CLIENT(7); QuotaSource(byte id); public byte id(); } public class AdminClient { public DescribeQuotasResult describeQuotas(Map<List<Resource>, <Collection<QuotaType>>, DescribeQuotasOptions options); public AlterQuotasResult alterQuotas(Map<List<Resource>, <Collection<Quota>> configs, AlterQuotasOptions options); } public class DescribeQuotasOptions { public DescribeQuotasOptions timeoutMs(Integer timeout); } public class DescribeQuotasResult { public Map<List<Resource>, <KafkaFuture<Collection<Quota>>> values(); } public class AlterQuotasOptions { public AlterQuotasOptions timeoutMs(Integer timeout); public AlterQuotasOptions validateOnly(boolean validateOnly); } public class AlterQuotasResult { public Map<List<Resource>, <KafkaFuture<Void>> results(); }
Request API
public class QuotaCollection { public QuotaCollection(ApiError error, Collection<Quota> entries); public QuotaCollection(Collection<Quota> entries); public ApiError error(); public Collection<Quota> entries(); } public class DescribeQuotasRequest extends AbstractRequest { public static Schema[] schemaVersions(); public static DescribeQuotasRequest parse(ByteBuffer buffer, short version); public static class Builder extends AbstractRequest.Builder { public Builder(Map<List<Resource>, <Collection<QuotaType>> quotaSettings); public DescribeQuotasRequest build(short version); } public DescribeQuotasRequest(short version, Map<List<Resource>, <Collection<QuotaType>> quotaSettings); public DescribeQuotasRequest(Struct struct, short version); public Map<List<Resource>, Collection<QuotaType>> quotaTypes(); } public class DescribeQuotasResponse extends AbstractResponse { public static Schema[] schemaVersions(); public DescribeQuotasResponse(int throttleTimeMs, Map<List<Resource>, QuotaCollection> quotas); public DescribeQuotasResponse(Struct struct); public Map<List<Resource>, QuotaCollection> quotas(); } public class AlterQuotasRequest extends AbstractRequest { public static Schema[] schemaVersions(); public static class Builder extends AbstractRequest.Builder { public Builder(Map<List<Resource>, QuotaCollection> quotaSettings); public DescribeQuotasRequest build(short version); } public AlterQuotasRequest(short version, Map<List<Resource>, QuotaCollection> quotas, boolean validateOnly); public AlterQuotasRequest(Struct struct, short version); public Map<List<Resource>, QuotaCollection> quotas(); } public class AlterQuotasResponse extends AbstractResponse { public static Schema[] schemaVersions(); public AlterQuotasRequest(short version, Map<List<Resource>, ApiError> quotas, boolean validateOnly); public AlterQuotasRequest(Struct struct, short version); public Map<List<Resource>, ApiError> errors(); public int throttleTimeMs(); }
New Command Line Interface
The kafka-config.sh command line interface will change a little bit in terms of help message and response format as we will use argparse4j for parsing arguments.
Help Message
usage: config-command [-h] --entity-type {topics,clients,users,brokers} [--force FORCE] [--add-config ADDCONFIG] [--delete-config DELETECONFIG] (--entity-name ENTITYNAME | --entity-default) (--describe | --alter) (--bootstrap-server BOOTSTRAPSERVERS | --adminclient.config CONFIGPROPERTIES | --adminclient-property ADMINCLIENTPROPERTY) Change configs for topics, clients, users, brokers dynamically. optional arguments: -h, --help show this help message and exit --entity-type {topics,clients,users,brokers} REQUIRED: the type of entity (topics/clients/users/brokers) --force FORCE Suppresses console prompts --add-config ADDCONFIG Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. --delete-config DELETECONFIG Config keys to remove in the following form: 'k1, k2'. You can specify only one in --entity-name and --entity-default --entity-name ENTITYNAME Name of entity (client id/user principal name) --entity-default Default entity name for clients/users (applies to corresponding entity type in command line) You can specify only one in --alter, --describe --describe List configs for the given entity. (default: false) --alter Alter the configuration for the entity. (default: false) REQUIRED. You can specify only one in --bootstrap-servers, --adminclient.config --bootstrap-server BOOTSTRAPSERVER The broker list string in the form HOST1:PORT1,HOST2:PORT2. --adminclient.config ADMINCLIENTCONFIG The config properties file for the Admin Client. --adminclient-propert ADMINCLIENTPROPERTY Comma separated list of properties in the form of 'k1=v1,k2=v2' to initialize the admin client. Process finished with exit code 0
Output Format
CONFIGS FOR TOPIC topicA Name Value Sensitive Read-only Default compression.type = producer false false true message.format.version = 1.0-IV0 false false true file.delete.delay.ms = 60000 false false true leader.replication.throttled.replicas = false false true max.message.bytes = 1000012 false false true min.compaction.lag.ms = 0 false false true message.timestamp.type = CreateTime false false true min.insync.replicas = 1 false false true segment.jitter.ms = 0 false false true preallocate = false false false true index.interval.bytes = 4096 false false true min.cleanable.dirty.ratio = 0.5 false false true unclean.leader.election.enable = false false false true retention.bytes = 10 false false false delete.retention.ms = 86400000 false false true cleanup.policy = delete false false true flush.ms = 9223372036854775807 false false true follower.replication.throttled.replicas = false false true segment.bytes = 1073741824 false false true retention.ms = 604800000 false false true segment.ms = 604800000 false false true message.timestamp.difference.max.ms = 9223372036854775807 false false true flush.messages = 9223372036854775807 false false true segment.index.bytes = 10485760 false false true
As seen above, the describe format becomes more organized and it will also return default properties (as the protocol currently supports that). In case of alter we will also do an extra describe after executing the alter and print the most fresh state.
Compatibility, Deprecation, And Migration Plan
Compatibility
Firstly, behavior of the --zookeeper
command line parameter will change. After this change it will print a warning message saying its ignored. Therefore every user will need to change --zookeeper
to --bootstrap-servers
, --adminclient-property
or --adminclient.config
.
Secondly, users as of this KIP would be able to describe all topics or brokers in one step but can't do it for clients and users. For those who have this use case will still need to use the old command for a while (see below). The reason for this change is currently MetadataRequest provides enough information about topics and brokers so it's possible to describe all of them in one step but there's no such information about clients and users.
Finally, backward compatibilty (for instance a 1.1 client wants to admin a 1.0 server) will be impacted as some of the protocols are newly created and doesn't exist in old servers. In these cases users should continue using the scala version of the ConfigCommand by putting the core jar on their classpath and defining the USE_OLD_COMMAND=true
environment variable. This variable will set the main class to the old command in the config and invokes that. This way the environment variable ensures that users who aren't able to use the new command currently would need to make minimal changes in order to continue using it.
Alternatively though the command could be launched through kafka-run-class.sh like this:
bin/kafka-run-class.sh kafka.admin.ConfigCommand --zookeeper localhost:2181 --describe --entity-type topics --entity-name topicA
Impact
Communicating through the broker instead of Zookeeper allows us to give more protection to Zookeeper as it could be hidden behind a firewall and you can only allow the broker through it. Also this would allow much finer grain authorization and audit of admin operations in the brokers.
From the CLI point of view the impact should be minimal as only the --zookeeper
option will change but we can assume the Zookeeper is a more protected resource than the CLIENT
ports of the brokers, therefore we can assume that they have knowledge about it and change with minimal effort.
From the compatibility point of view there might be a bigger impact as mentioned above. Since the command now uses the wire protocols (including some newly introduced ones) the backward compatibility will be impacted. That means that a user can't use a 1.1 client to administrate a 1.0 broker as in the older broker some of the wire protocols don't exist. This again should be acceptable most of the users as most of the admin commands require the core jar on the classpath which means that most of the time the commands are executed from an environment with the same version of the brokers. In the remaining cases users will have to change to use kafka-run-class or the USE_OLD_COMMAND
environment variable.
Deprecation
kafka.admin.ConfigCommand
will print a warning message saying it is deprecated and will be removed in version 2.0.
To ease the migration for users who are stuck with this command, the USE_OLD_COMMAND
will be introduced.
Special Migration Tools
There are no tools required.
Removal Of The Existing Behavior
The current --zookeeper
option will be disabled with this change as it has minimal impact on the current users.
Test Plan
Most of the functionality can be covered with end-to-end tests. There will be unit tests also to verify the protocol and the broker side logic.
Future Considerations
At this moment this ConfigCommand can describe all the topics and all the brokers with one command but can't describe all clients or users. The reason for this is that we can gain the required information for topics and brokers by a MetadataRequest but we have no such protocols for getting a list of users or clients.