This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here
JIRA: KAFKA-5722
Planned release: 1.1.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Describe the problems you are trying to solve.
- KIP-4 defines the high level motivation for using the admin client and KAFKA-3268 with its subtasks acts as the umbrella JIRA for this KIP.
- The current implementation of the
ConfigCommand
(kafka.admin.ConfigCommand
) which is used bykafka-configs.sh
connects to Zookeeper directly. This prevents the tool from being used in deployments where only the brokers are exposed to clients (i.e. where the zookeeper servers are intentionally not exposed). - There is a general push to refactor/rewrite/replace tools which need Zookeeper access with equivalents which use the
AdminClient
API. Thus it is necessary to change theConfigCommand
so that it no longer talks to Zookeeper directly, but via an intermediating broker. - Makes the ConfigCommand transparent to the authorization mechanism therefore enables higher level of security. The broker will be able to authorize config admin requests.
- The
AdminClient
API currently lacks any functionality for changing broker, user and client configurations which is possible with the current Zookeeper basedConfigCommand
implementation. - Changing the ConfigCommand will increase the user experience for KIP-226 for the above mentioned reasons.
Public Interfaces
Command Line Tools And Arguments
The options accepted by kafka-configs.sh
command will change:
--zookeeper
will be deprecated, which means it will display a warning message saying that it's ignored.- a new
--bootstrap-server
option will be added - a new
--config.properties
option will be added
Protocol Changes
KIP-133 introduced the describe and alter admin protocols and KIP-140 a wire format representation for ResourceType. We will modify these to accommodate the new requirements.
Wire Format Types
ResourceType
0: Unknown
1: Any
2: Topic
3: Group
4: Broker
5: User (new)
6: Client (new)
List Quotas
To create a robust API for the admin client we should be able to list quotas for clients and users. This is needed to be able to create finer grained authorization if needed (for instance listing would be more restrictive than simply describing) and to provide a query functionality so that multiple quota configs could be described or altered in a batch.
ListQuotas Request (Version: 1) => quota_config_resource quota_config_resource => type name type => INT8 name => NULLABLE_STRING
Request semantics:
- Can be sent to any broker
- The type of
quota_config_resource
can either be "Client" or "User". - The
name
ofquota_config_resource
is nullable.- If
name
is not provided, we list the given type of resources. - If
name
is provided, we list the children of the given user or client resource.
- If
- Authorization: "ListQuotas" can only be interpreted on the "Cluster" resource and represented by the Describe ACL due to the similarity with other listing protocols like ListOffsets or ListGroups. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
ListQuotas Response (Version: 1) => throttle_time_ms error_code error_message [quota_config_entity_name] throttle_time_ms => INT32 error_code => INT16 error_message => NULLABLE_STRING quota_config_entity_name => STRING
Response semantics:
- Only the quota config names will be returned because the request explicitly specifies the config's type information ("User" and/or "Client") therefore there is no need to specify it here.
- The error returned means that the requestor doesn't have access to the specified resource.
Describe Quotas
To be able to implement the use cases of kafka-configs.sh where a quota is modified, like user, client or (user,client) we have to create a protocol to handle quota listings. The justification for a new protocol is that a quota is quite different from a broker or topic config because a quota can sometimes be identified a simple user, client or even a (user,client) tuple while a topic or a broker config can be identified only by the topic's name or the broker's ID.
DescribeQuotas Request (Version: 1) => [resource] resource => quota_config_resource child_quota_config_resource [config_name] quota_config_resource => type name type => INT8 name => NULLABLE_STRING child_quota_config_resource => type name type => INT8 name => NULLABLE_STRING config_name => STRING
Request semantics:
- Can be sent to any broker
- If the
name
is <default> it means that listing the default quota is asked. Responses will be returned the same way for defaults. - If the
config_name
array isnull
, all configs are returned. Otherwise, configs with the provided names are returned. - Authorization: "DescribeQuotas" can only be interpreted on the "Cluster" resource and represented by the DescribeConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
DescribeQuotas Response (Version: 1) => throttle_time_ms [resource] throttle_time_ms => INT32 resource => quota_config_resource child_quota_config_resource [config] quota_config_resource => type name type => INT8 name => NULLABLE_STRING child_quota_config_resource => type name type => INT8 name => NULLABLE_STRING config => error_code error_message [config_entry] error_code => INT16 error_message => NULLABLE_STRING config_entry => config_name => STRING config_value => STRING read_only => BOOLEAN is_default => BOOLEAN is_sensitive => BOOLEAN
Alter Quotas
AlterQuota Request (Version: 0) => validate_only [resource] validate_only => BOOLEAN resource => quota_config_resource child_quota_config_resource [config] quota_config_resource => type name type => INT8 name => NULLABLE_STRING child_quota_config_resource => type name type => INT8 name => NULLABLE_STRING config => config_name config_value config_name => STRING config_value => STRING
Request Semantics
- Can be sent to any broker
- If
name
is <default>
it means that altering a default quota is asked. - If an
Alter
operation is attempted on a read-only config, anInvalidRequestException
error will be returned for the relevant resource. - Authorization: "AlterQuotas" can only be interpreted on the "Cluster" resource and represented by the AlterConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
- For tools that allow users to alter quota configs, a validation/dry-run mode where validation errors are reported but no creation is attempted is available via the
validate_only
parameter.
AlterQuotas Response (Version: 0) => throttle_time_ms [resource] throttle_time_ms => INT32 resource => quota_config_resource child_quota_config_resource [config] quota_config_resource => type name type => INT8 name => NULLABLE_STRING child_quota_config_resource => type name type => INT8 name => NULLABLE_STRING config => error_code => INT16 error_message => NULLABLE_STRING config_name => STRING
AdminClient APIs
public class AdminClient { public DescribeQuotasResult describeQuotas(String userId, String clientId, Collection<String> configs, final DescribeQuotasOptions options); public AlterQuotasResult alterQuotas(Map<QuotaEntityTuple, Config> configs, AlterQuotasOptions options); } public class DescribeQuotasOptions { public DescribeQuotasOptions timeoutMs(Integer timeout); } public class DescribeQuotasResult { public Map<QuotaConfigResourceTuple, KafkaFuture<Config>> values(); } public class AlterQuotasOptions { public AlterQuotasOptions timeoutMs(Integer timeout); public AlterQuotasOptions validateOnly(boolean validateOnly); } public class AlterQuotasResult { public Map<QuotaConfigResourceTuple, KafkaFuture<Void>> results(); }
Request API
public class DescribeQuotasRequest extends AbstractRequest { public static Schema[] schemaVersions(); public static DescribeQuotasRequest parse(ByteBuffer buffer, short version); public static class Builder extends AbstractRequest.Builder { public Builder(Map<QuotaConfigResourceTuple, Collection<String>> quotaConfigSettings); public DescribeQuotasRequest build(short version); } public DescribeQuotasRequest(short version, Map<QuotaConfigResourceTuple, Collection<String>> quotaConfigSettings); public DescribeQuotasRequest(Struct struct, short version); public Map<QuotaConfigResourceTuple, Collection<String>> quotaConfigSettings(); } public class DescribeQuotasResponse extends AbstractResponse { public static Schema[] schemaVersions(); public DescribeQuotasResponse(int throttleTimeMs, Map<QuotaConfigResourceTuple, DescribeConfigsResponse.Config> configs); public DescribeQuotasResponse(Struct struct); public Map<QuotaConfigResourceTuple, DescribeConfigsResponse.Config> quotaConfigSettings(); } /** * Stores a child and its parent resource. This is used for representing a (user, client) resource tuple where * the user part considered to be the parent resource and the client is the child. */ public final class QuotaConfigResourceTuple { private final Resource quotaConfigResource; private final Resource childQuotaConfigResource; public Resource quotaConfigResource(); public Resource childQuotaConfigResource(); public QuotaConfigResourceTuple(Resource quotaConfigResource); public QuotaConfigResourceTuple(Resource quotaConfigResource, Resource childQuotaConfigResource); }
New Command Line Interface
The kafka-config.sh command line interface will change a little bit in terms of help message and response format as we will use argparse4j for parsing arguments.
Help Message
usage: config-command [-h] --entity-type {topics,clients,users,brokers} [--force FORCE] [--add-config ADDCONFIG] [--delete-config DELETECONFIG] (--entity-name ENTITYNAME | --entity-default) (--describe | --alter) (--bootstrap-servers BOOTSTRAPSERVERS | --config.properties CONFIGPROPERTIES) Change configs for topics, clients, users, brokers dynamically. optional arguments: -h, --help show this help message and exit --entity-type {topics,clients,users,brokers} REQUIRED: the type of entity (topics/clients/users/brokers) --force FORCE Suppresses console prompts --add-config ADDCONFIG Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. --delete-config DELETECONFIG Config keys to remove in the following form: 'k1, k2'. You can specify only one in --entity-name and --entity-default --entity-name ENTITYNAME Name of entity (client id/user principal name) --entity-default Default entity name for clients/users (applies to corresponding entity type in command line) You can specify only one in --alter, --describe --describe List configs for the given entity. (default: false) --alter Alter the configuration for the entity. (default: false) You can specify only one in --bootstrap-servers, --config.properties --bootstrap-servers BOOTSTRAPSERVERS REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2. --config.properties CONFIGPROPERTIES REQUIRED: The config properties file for the Admin Client. Process finished with exit code 0
Output Format
CONFIGS FOR TOPIC topicA Name Value Sensitive Read-only Default compression.type = producer false false true message.format.version = 1.0-IV0 false false true file.delete.delay.ms = 60000 false false true leader.replication.throttled.replicas = false false true max.message.bytes = 1000012 false false true min.compaction.lag.ms = 0 false false true message.timestamp.type = CreateTime false false true min.insync.replicas = 1 false false true segment.jitter.ms = 0 false false true preallocate = false false false true index.interval.bytes = 4096 false false true min.cleanable.dirty.ratio = 0.5 false false true unclean.leader.election.enable = false false false true retention.bytes = 10 false false false delete.retention.ms = 86400000 false false true cleanup.policy = delete false false true flush.ms = 9223372036854775807 false false true follower.replication.throttled.replicas = false false true segment.bytes = 1073741824 false false true retention.ms = 604800000 false false true segment.ms = 604800000 false false true message.timestamp.difference.max.ms = 9223372036854775807 false false true flush.messages = 9223372036854775807 false false true segment.index.bytes = 10485760 false false true
As seen above, the describe format becomes more organized and it will also return default properties (as the protocol currently supports that). In case of alter we will also do an extra describe after executing the alter and print the most fresh state.
Compatibility, Deprecation, And Migration Plan
Compatibility
The behavior of the --zookeeper
command line parameter will change. After this change it will print a warning message saying its ignored. Therefore every user will need to change --zookeeper
to --bootstrap-servers
.
Also backward compatibilty (saying an 1.1 client wants to admin a 1.0 server) will be impacted as some of the protocols are newly created and doesn't exist in old servers. In these cases users should continue using the scala version of the ConfigCommand by putting the core jar on their classpath and defining the USE_OLD_COMMAND=true
environment variable. This variable will set the main class to the old command in the config and invokes that. This way the environment variable ensures that users who aren't able to use the new command currently would need to make minimal changes in order to continue using it.
Alternatively though the command could be launched through kafka-run-class.sh like this:
bin/kafka-run-class.sh kafka.admin.ConfigCommand --zookeeper localhost:2181 --describe --entity-type topics --entity-name topicA
Impact
Communicating through the broker instead of Zookeeper allows us to give more protection to Zookeeper as it could be hidden behind a firewall and you can only allow the broker through it. Also this would allow much finer grain authorization and audit of admin operations in the brokers.
From the CLI point of view the impact should be minimal as only the --zookeeper
option will change but we can assume the Zookeeper is a more protected resource than the CLIENT
ports of the brokers, therefore we can assume that they have knowledge about it and change with minimal effort.
From the compatibility point of view there might be a bigger impact as mentioned above. Since the command now uses the wire protocols (including some newly introduced ones) the backward compatibility will be impacted. That means that a user can't use a 1.1 client to administrate a 1.0 broker as in the older broker some of the wire protocols don't exist. This again should be acceptable most of the users as most of the admin commands require the core jar on the classpath which means that most of the time the commands are executed from an environment with the same version of the brokers. In the remaining cases users will have to change to use kafka-run-class or the USE_OLD_COMMAND
environment variable.
Deprecation
kafka.admin.ConfigCommand
will print a warning message saying it is deprecated and will be removed in version 2.0.
To ease the migration for users who are stuck with this command, the USE_OLD_COMMAND
will be introduced.
Special Migration Tools
There are no tools required.
Removal Of The Existing Behavior
The current --zookeeper
option will be disabled with this change as it has minimal impact on the current users.
Test Plan
Most of the functionality can be covered with end-to-end tests. There will be unit tests also to verify the protocol and the broker side logic.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.