The goals behind the command line shell are fundamentally to provide a centralized management for Kafka operations.
Status
Current state: Accepted
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Users of Kafka have created dozens of different systems to work with Kafka. Providing a wire protocol that allows the brokers to execute administrative code and public api/client has many benefits including:
- Allows clients in any language to administrate Kafka
- Wire protocol is supported by any language
- Provides public client for performing admin operations
- Ensures integration test code in other projects and clients maintains compatibility
- Prevents users from needing to use the Command classes and work around standard output and system exits
- Removing the need for admin scripts (kafka-topics.sh, kafka-acls.sh, etc) to talk directly to Zookeeper.
- Allows ZNodes to be completely locked down via ACLs
- Further hides the Zookeeper details of Kafka
Public Interfaces
- Changes to Wire Protocol:
- Adds the following new Request/Response messages:
- CreateTopic
- AlterTopic
- DeleteTopic
- ListAcls
- AlterAcls
- DescribeConfig
- AlterConfig
- Modifies Metadata Response to allowing polling for in-progress or complete admin operations. Added fields include:
- Boolean indicating if a topic is marked for deletion
- Boolean indicating if a topic is an internal topic
- Rack information (if not added by KIP-36 Rack aware replica assignment)
- Adds the following new Request/Response messages:
- New Java client: AdminClient - a Wire Protocol client for administrative operations
Proposed Changes
Proposed changes include 4 parts:
- Wire protocol additions and changes
- Server-side message handlers and authorization
New Java AdminClient implementation
- Refactor admin scripts and code to use new client where appropriate
Follow Up Changes
Changes that should be done shortly after or are enabled by this KIP included:
1. Wire Protocol Extensions
Overview
It is proposed to add / modify these 4 types of requests:
- Topic commands which include
CreateTopic(Request | Response)
,AlterTopic
,DeleteTopic
Please find details under specific RQ/RP schema proposal.
Schema
Finally, Topic Admin schema requests are likely to be used not only in CLI tool, where the common use case is create/change/delete a single entity. Since Kafka is able to maintain a huge number of topics it is vital user can efficiently request many commands at one time. That's why all Topic Admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network calls. At the same time to make Schema usage transparent and compliant with existing requests (such as Produce and Fetch) if batch request includes more than one instruction for a specific topic only the last from the list will be executed, others will be silently ignored.
New Protocol Errors
It is proposed to use existing / add these error codes to the protocol.
Error | Description | Requests |
---|---|---|
TopicAlreadyExists | Topic with this name already exists. | Create |
InvalidTopic (existing) | Topic name contains invalid characters or doesn't exist. | Create, Alter, Delete |
InvalidPartitions | Partitions field is invalid (e.g. negative or increasing number of partitions in existing topic) | Create, Alter |
InvalidReplicationFactor | ReplicationFactor field is invalid (e.g. negative) | Create, |
InvalidReplicaAssignment | ReplicaAssignment field is invalid (e.g. contains duplicates) |
|
InvalidTopicConfiguration | Either topic-level config setting or value is incorrect. | Create |
DecreasePartitionsNotAllowed | Invalid Partitions argument: decreasing partitions is prohibited when altering topic. | Alter |
ReassignPartitionsInProgress | Reassign partitions procedure has been already started. | Alter |
Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.
The same notation as in A Guide To The Kafka Protocol is used here.
Topic Admin Schema
Create Topic Request
CreateTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment [ConfigEntry]] TopicName => string Partitions => int32 Replicas => int32 ReplicaAssignment => [PartitionId [ReplicaId]] ConfigEntry => ConfigKey ConfigValue ConfigKey => string ConfigValue => string |
CreateTopicRequest
is a batch asynchronous request to initiate topic creation with either predefined or automatic replica assignment and optionally topic configuration.- Only one from (
Partitions
+ReplicationFactor
),ReplicaAssignment
can be defined in one instruction. (Note: there is a special use case - automatic topic creation forTopicMetadataRequest
, to trigger it user should set client_id=consumer and define only topic name). If both parameters are specified -ReplicaAssignment
takes precedence. - In case
ReplicaAssignment
is defined number of partitions and replicas will be calculated from the suppliedReplicaAssignment
. In case of defined (Partitions
+ReplicationFactor
) replica assignment will be automatically generated by the server. - Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Create Topic Response
CreateTopicResponse => [TopicName ErrorCode] ErrorCode => int16 TopicName => string |
CreateTopicResponse
contains a map between topic and topic creation result error code (see New Protocol Errors).
Alter Topic Request
AlterTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment] TopicName => string Replicas => int32 Partitions => int32 ReplicaAssignment => [PartitionId [ReplicaId]] |
AlterTopicRequest
is a batch asynchronous request to initiate topic alteration: replication parameters and replica assignment.1. If ReplicaAssignment
is defined
ReplicationFactor
and Partitions arguments are ignored in this case.
For each partition in ReplicaAssignment
:
1.1 If such partition exists and assignment is different from the current replica assignment
It's a "reassign partition" request - add it to reassign-partitions json
1.2 If such partition doesn't exist
It's an "add partition" request - change topic metadata in zookeeper to trigger increase partition logic
2. Else if ReplicationFactor
is defined
2.1 If Partitions
is defined
Regenerate replica assignment for all existing and newly added partitions, goto 1.
2.2 If Partitions
is not defined
Regenerate replica assignment only for existing partitions, goto 1.
3. Else if Partitions
is defined (ReplicaAssignment
and ReplicationFactor
are not defined):
3.1 If Partitions
is less than current number of partitions return error code InvalidPartitions
(since increasing number of partitions is not allowed).
3.2 Otherwise, generate automatically replica assignment for newly added partitions, goto 1.
Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Alter Topic Response
[TopicName ErrorCode]ErrorCode => int16 TopicName => string
|
AlterTopicResponse
is similar to CreateTopicResponse
.Delete Topic Request
DeleteTopicRequest => [TopicName] TopicName => string |
DeleteTopicRequest
requires only topic names which should be deleted.
Delete Topic Response
ErrorCode => int16 TopicName => string
|
DeleteTopicResponse
is similar to CreateTopicResponse
.
2. Server-side Admin Request handlers
All incoming requests will be handled by a specific helper class called from KafkaApis
- TopicCommandHelper.
All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands) most likely the logic from those classes will be extracted and copied to separate class (as proposed - TopicCommandHelper
).
3. Admin Client
This component is intended to be a Kafka out-of-box client implementation for Admin commands.
Admin client will use Kafka NetworkClient
facility from /clients
for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).
Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.
Proposed API:
4. Interactive Shell / CLI tool
This component wraps AdminClient
and provide an interactive shell-like environment for executing administrative commands. The goal of these changes is let people use existing standalone tools but from a single script, optionally running commands in shell, so commands arguments/names are not changed comparing to existing tools, where possible.
The tool supports two modes:
- command line interface
- interactive shell mode
Command Line Interface
This mode lets user run commands from shell script. List of available commands:
(Note: not all possible options are listed - e.g. alter replication factor)
# Topic Commands - options are ported from TopicCommand.scala bin/kafka.sh --create-topic --topic my_topic --partitions 5 --replication-factor 3 --config key=value --broker-list <host : port> bin/kafka.sh --alter-topic --topic my_topic --partitions 10 --broker-list <host : port> bin/kafka.sh --delete-topic --topic my_topic --broker-list <host : port> bin/kafka.sh --list-topics --broker-list <host : port> bin/kafka.sh --describe-topic --topic my_topic --broker-list <host : port> # Reassign Partitions - options are ported from ReassignPartitionsCommand.scala bin/kafka.sh --reassign-partitions --reassignment-json-file /user/file.json --blocking --broker-list <host : port> # Preferred Replica Leader Elections - options are ported from PreferredReplicaLeaderElectionCommand.scala bin/kafka.sh --preferred-replica-leader-election --preferred-replica-leader-election /user/file.json --blocking --broker-list <host : port> # Start kafka.sh in shell mode |
User will have to supply all commands with --broker-list <host : port>
to define at least one broker from the target cluster.
Interactive Shell Mode
Interactive shell mode provides extended facilities for admin commands execution. Command names and options are the same but user will have to define --broker-list
only once - CLI tool in interactive mode will manage cluster metadata cache and send commands to proper broker.
Also this mode provides facilities to switch context, so e.g. all topic commands are applied to switched topic - no need to specify topic-name for each topic command.
Proposed use-case is the following:
# Start kafka.sh in shell mode bin/kafka.sh --shell --broker-list <host1 : port1> Connected to Kafka Controller at <host2 : port2>.
kafka> create-topic --topic my_topic --partitions 5 --replication-factor 3 Topic "my_topic" is created. kafka> alter-topic --topic my_topic --partitions 10 Topic "my_topic" is changed. # Switch topic context kafka> topic my_topic Switched to "my_topic" topic. # Execute topic command for switched topic kafka my_topic> describe-topic "my-topic" details: Topic: my_topic Partitions: 10 ... # Switch off topic context kafka my_topic> topic kafka> |
The current tools will still be available in the 0.8.3 release but will be removed for 0.9 (tracked here https://issues.apache.org/jira/browse/KAFKA-1776 ).
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.
TopicMetadataRequest/Response: After some debate we decided not to evolve the TopicMetadataResponse to remove the ISR field (which currently can return incorrect information). There is a use-case for this in KAFKA-2225, so we will treat this a bug and fix it going forward. See KAFKA-1367 for more details