You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]

Discussion threadhere

JIRAKAFKA-5722

Planned release: 1.1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

  • KIP-4 defines the high level motivation for using the admin client and KAFKA-3268 with its subtasks acts as the umbrella JIRA for this KIP.
  • The current implementation of the ConfigCommand (kafka.admin.ConfigCommand) which is used by kafka-configs.sh connects to Zookeeper directly. This prevents the tool from being used in deployments where only the brokers are exposed to clients (i.e. where the zookeeper servers are intentionally not exposed). 
  • There is a general push to refactor/rewrite/replace tools which need Zookeeper access with equivalents which use the AdminClient API. Thus it is necessary to change the ConfigCommand so that it no longer talks to Zookeeper directly, but via an intermediating broker.
  • Makes the ConfigCommand transparent to the authorization mechanism therefore enables higher level of security. The broker will be able to authorize config admin requests.
  • The AdminClient API currently lacks any functionality for changing broker, user and client configurations which is possible with the current Zookeeper based ConfigCommand implementation.
  • Changing the ConfigCommand will increase the user experience for KIP-226 for the above mentioned reasons.

Public Interfaces

Command Line Tools And Arguments

The options accepted by kafka-configs.sh command will change:

  • --zookeeper will be deprecated, which means it will display a warning message saying that it's ignored.
  • a new --bootstrap-server option will be added
  • a new --adminclient.config option will be added

Protocol Changes

KIP-133 introduced the describe and alter admin protocols and KIP-140 a wire format representation for ResourceType. We will modify these to accommodate the new requirements.

Wire Format Types

ResourceType
  • 0: Unknown
  • 1: Any
  • 2: Topic
  • 3: Group
  • 4: Broker
  • 5: User (new)
  • 6: Client (new)

Describe Quotas

To be able to implement the use cases of kafka-configs.sh where a quota is modified, like user, client or (user,client) we have to create a protocol to handle quota listings. The justification for a new protocol is that a quota is quite different from a broker or topic config because a quota can sometimes be identified a simple user, client or even a (user,client) tuple while a topic or a broker config can be identified only by the topic's name or the broker's ID.

DescribeQuotas Request
DescribeQuotas Request (Version: 1) => [resource]
  resource => [quota_config_resource] [config_name]
    quota_config_resource => type name
      type => INT8
      name => STRING
    config_name => STRING

Request semantics:

  1. Can be sent to any broker
  2. If the name is <default> it means that listing the default quota is asked. Responses will be returned the same way for defaults.
  3. If the config_name array is null, all configs are returned. Otherwise, configs with the provided names are returned.
  4. Authorization:  "DescribeQuotas" can only be interpreted on the "Cluster" resource and represented by the DescribeConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.

 

DescribeQuotas Response
DescribeQuotas Response (Version: 1) => throttle_time_ms [resource]
  throttle_time_ms => INT32
  resource => [quota_config_resource] [config]
    quota_config_resource => type name
      type => INT8
      name => STRING
    config => error_code error_message [config_entry]
      error_code => INT16
      error_message => NULLABLE_STRING
      config_entry =>
        config_name => STRING
        config_value => STRING
        read_only => BOOLEAN
        is_default => BOOLEAN
        is_sensitive => BOOLEAN

Alter Quotas

AlterQuotas Request
AlterQuota Request (Version: 0) => validate_only [resource]
  validate_only => BOOLEAN
  resource => [quota_config_resource] [config]
    quota_config_resource => type name
      type => INT8
      name => STRING
    config => config_name config_value
      config_name => STRING
      config_value => STRING

Request Semantics

  1. Can be sent to any broker
  2. If name is <default> it means that altering a default quota is asked.
  3. If an Alter operation is attempted on a read-only config, an InvalidRequestException error will be returned for the relevant resource.
  4. Authorization:  "AlterQuotas" can only be interpreted on the "Cluster" resource and represented by the AlterConfigs ACL due to the similarity in use cases. Unauthorized requests will receive an appropriate AuthorizationFailed error code.
  5. For tools that allow users to alter quota configs, a validation/dry-run mode where validation errors are reported but no creation is attempted is available via the validate_only parameter.

 

AlterQuotas Response
AlterQuotas Response (Version: 0) => throttle_time_ms [resource]
  throttle_time_ms => INT32
  resource => [quota_config_resource] [config]
    quota_config_resource => type name
      type => INT8
      name => STRING
    config =>
      error_code => INT16
      error_message => NULLABLE_STRING
      config_name => STRING

AdminClient APIs

org.apache.kafka.clients.admin
public class AdminClient {
    public DescribeQuotasResult describeQuotas(String userId, String clientId, Collection<String> configs, final DescribeQuotasOptions options);
    public AlterQuotasResult alterQuotas(Map<QuotaEntityTuple, Config> configs, AlterQuotasOptions options);
}
public class DescribeQuotasOptions { 
    public DescribeQuotasOptions timeoutMs(Integer timeout);
}

public class DescribeQuotasResult {
    public Map<QuotaConfigResourceTuple, KafkaFuture<Config>> values();
}
 
public class AlterQuotasOptions { 
    public AlterQuotasOptions timeoutMs(Integer timeout);
	public AlterQuotasOptions validateOnly(boolean validateOnly);
}

public class AlterQuotasResult {
    public Map<QuotaConfigResourceTuple, KafkaFuture<Void>> results();
}

Request API

org.apache.kafka.common.requests
public class DescribeQuotasRequest extends AbstractRequest {
	
	public static Schema[] schemaVersions();
	public static DescribeQuotasRequest parse(ByteBuffer buffer, short version);
 
	public static class Builder extends AbstractRequest.Builder {
		public Builder(Map<QuotaConfigResourceTuple, Collection<String>> quotaConfigSettings);
		public DescribeQuotasRequest build(short version);
	}
 
	public DescribeQuotasRequest(short version, Map<QuotaConfigResourceTuple, Collection<String>> quotaConfigSettings);
	public DescribeQuotasRequest(Struct struct, short version);
 
	public Map<QuotaConfigResourceTuple, Collection<String>> quotaConfigSettings();
}

public class DescribeQuotasResponse extends AbstractResponse {
	public static Schema[] schemaVersions();

	public DescribeQuotasResponse(int throttleTimeMs, Map<QuotaConfigResourceTuple, DescribeConfigsResponse.Config> configs);
	public DescribeQuotasResponse(Struct struct);

	public Map<QuotaConfigResourceTuple, DescribeConfigsResponse.Config> quotaConfigSettings();
}

 

New Command Line Interface

The kafka-config.sh command line interface will change a little bit in terms of help message and response format as we will use argparse4j for parsing arguments.

Help Message

usage: config-command [-h] --entity-type {topics,clients,users,brokers}
                      [--force FORCE] [--add-config ADDCONFIG]
                      [--delete-config DELETECONFIG]
                      (--entity-name ENTITYNAME | --entity-default)
                      (--describe | --alter)
                      (--bootstrap-servers BOOTSTRAPSERVERS |
                      --config.properties CONFIGPROPERTIES)

Change configs for topics, clients, users, brokers dynamically.

optional arguments:
  -h, --help             show this help message and exit
  --entity-type {topics,clients,users,brokers}
                         REQUIRED:      the       type       of      entity
                         (topics/clients/users/brokers)
  --force FORCE          Suppresses console prompts
  --add-config ADDCONFIG
                         Key  Value  pairs  of   configs   to  add.  Square
                         brackets  can  be  used   to  group  values  which
                         contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'.
  --delete-config DELETECONFIG
                         Config keys to remove in  the following form: 'k1,
                         k2'.

  You can specify only one in --entity-name and --entity-default

  --entity-name ENTITYNAME
                         Name of entity (client id/user principal name)
  --entity-default       Default entity name for  clients/users (applies to
                         corresponding entity type in command line)

  You can specify only one in --alter, --describe

  --describe             List  configs  for  the  given  entity.  (default:
                         false)
  --alter                Alter the configuration for  the entity. (default:
                         false)

  You can specify only one in --bootstrap-servers, --config.properties

  --bootstrap-servers BOOTSTRAPSERVERS
                         REQUIRED: The  broker  list  string  in  the  form
                         HOST1:PORT1,HOST2:PORT2.
  --config.properties CONFIGPROPERTIES
                         REQUIRED:  The  config  properties  file  for  the
                         Admin Client.

Process finished with exit code 0

Output Format

                                                    CONFIGS FOR TOPIC topicA

                                     Name   Value                                Sensitive  Read-only  Default
                         compression.type = producer                                 false      false     true
                   message.format.version = 1.0-IV0                                  false      false     true
                     file.delete.delay.ms = 60000                                    false      false     true
    leader.replication.throttled.replicas =                                          false      false     true
                        max.message.bytes = 1000012                                  false      false     true
                    min.compaction.lag.ms = 0                                        false      false     true
                   message.timestamp.type = CreateTime                               false      false     true
                      min.insync.replicas = 1                                        false      false     true
                        segment.jitter.ms = 0                                        false      false     true
                              preallocate = false                                    false      false     true
                     index.interval.bytes = 4096                                     false      false     true
                min.cleanable.dirty.ratio = 0.5                                      false      false     true
           unclean.leader.election.enable = false                                    false      false     true
                          retention.bytes = 10                                       false      false    false
                      delete.retention.ms = 86400000                                 false      false     true
                           cleanup.policy = delete                                   false      false     true
                                 flush.ms = 9223372036854775807                      false      false     true
  follower.replication.throttled.replicas =                                          false      false     true
                            segment.bytes = 1073741824                               false      false     true
                             retention.ms = 604800000                                false      false     true
                               segment.ms = 604800000                                false      false     true
      message.timestamp.difference.max.ms = 9223372036854775807                      false      false     true
                           flush.messages = 9223372036854775807                      false      false     true
                      segment.index.bytes = 10485760                                 false      false     true

As seen above, the describe format becomes more organized and it will also return default properties (as the protocol currently supports that). In case of alter we will also do an extra describe after executing the alter and print the most fresh state.

Compatibility, Deprecation, And Migration Plan

Compatibility

The behavior of the --zookeeper command line parameter will change. After this change it will print a warning message saying its ignored. Therefore every user will need to change --zookeeper to --bootstrap-servers.

Also backward compatibilty (saying an 1.1 client wants to admin a 1.0 server) will be impacted as some of the protocols are newly created and doesn't exist in old servers. In these cases users should continue using the scala version of the ConfigCommand by putting the core jar on their classpath and defining the USE_OLD_COMMAND=true environment variable. This variable will set the main class to the old command in the config and invokes that. This way the environment variable ensures that users who aren't able to use the new command currently would need to make minimal changes in order to continue using it. 

Alternatively though the command could be launched through kafka-run-class.sh like this:

bin/kafka-run-class.sh kafka.admin.ConfigCommand --zookeeper localhost:2181 --describe --entity-type topics --entity-name topicA

Impact

Communicating through the broker instead of Zookeeper allows us to give more protection to Zookeeper as it could be hidden behind a firewall and you can only allow the broker through it. Also this would allow much finer grain authorization and audit of admin operations in the brokers.

 From the CLI point of view the impact should be minimal as only the --zookeeper option will change but we can assume the Zookeeper is a more protected resource than the CLIENT ports of the brokers, therefore we can assume that they have knowledge about it and change with minimal effort.

From the compatibility point of view there might be a bigger impact as mentioned above. Since the command now uses the wire protocols (including some newly introduced ones) the backward compatibility will be impacted. That means that a user can't use a 1.1 client to administrate a 1.0 broker as in the older broker some of the wire protocols don't exist. This again should be acceptable most of the users as most of the admin commands require the core jar on the classpath which means that most of the time the commands are executed from an environment with the same version of the brokers. In the remaining cases users will have to change to use kafka-run-class or the USE_OLD_COMMAND environment variable.

Deprecation

kafka.admin.ConfigCommand will print a warning message saying it is deprecated and will be removed in version 2.0.

To ease the migration for users who are stuck with this command, the USE_OLD_COMMAND will be introduced.

Special Migration Tools

There are no tools required.

Removal Of The Existing Behavior

The current --zookeeper option will be disabled with this change as it has minimal impact on the current users.

Test Plan

Most of the functionality can be covered with end-to-end tests. There will be unit tests also to verify the protocol and the broker side logic.

Future Considerations

At this moment this ConfigCommand can describe all the topics and all the brokers with one command but can't describe all clients or users. The reason for this is that we can gain the required information for topics and brokers by a MetadataRequest but we have no such protocols for getting a list of users or clients. 

Therefore we could have a ConfigEntityList protocol tailored to the needs of the admin client. This protocol would send a list of config entities in the request and get a list of entities in the response. For instance requesting (type:USER name:user1, type:CLIENT name:) resources would return all the clients of user1.

  • No labels