The goals behind the command line shell are fundamentally to provide a centralized management for Kafka operations.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Folks have created dozens of different systems to work with Kafka. If we can provide a wire protocol that allows the brokers to execute administrative code then clients can more simply execute commands. We have a lot of ties with zookeeper making it near impossible for some code to-do nothing better/more than shell script wrapping kafka-topics.sh etc. With the wire protocol we should be able to have client in any language work with administrating Kafka. If someone wants a REST interface (for example) then can write that in whatever language they like. We should have a client from the project that is not only an example but a fully functionality replacement for the kafka-topics, reassign-partitions, consumer offset tools.
Public Interfaces
- Changes to Wire Protocol:
Extending protocol with these messages:
- CreateTopic(Request | Response), AlterTopic, DeleteTopic, ListTopics, DescribeTopic
- PreferredLeaderReplicaElection
- ReassignPartitions
- New client: AdminClient - a Wire Protocol client for administrative operations
- New CLI tool: Kafka Shell - a single unified command line interface for administrative operations
Proposed Changes
Proposed changes include 4 parts:
- Wire Protocol extensions - to add new Admin messages
- Server-side Admin commands handlers (TopicCommand-like)
- Admin Client - an out-of-box client for performing administrative commands
Interactive Shell / CLI tool supporting administrative commands
(You can try out some of the changes from this KIP. Please follow this link to get the code and start the CLI tool).
1. Wire Protocol Extensions
Overview
It is proposed to add / modify these 3 types of requests:
- Topic commands which include
CreateTopic(Request | Response)
,AlterTopic
,DeleteTopic
- Replication tools -
PreferredReplicaLeaderElection
- Extend
TopicMetadataRequest
to include topic configuration, and partition fetch lag per replica
Please find details under specific RQ/RP schema proposal.
Schema
The same notation as in A Guide To The Kafka Protocol is used here.
New Protocol Errors
It is proposed to add these error codes to the protocol.
Error | Code | Description | Requests |
---|---|---|---|
TopicAlreadyExists | 1001 | Topic with this name already exists. | CreateTopicRequest |
InvalidArgumentPartitions | 1002 | Either partition field is invalid (e.g. negative), or not defined when needed. | CreateTopicRequest , AlterTopicRequest |
DecreasePartitionsNotAllowed | 1003 | Invalid partitions argument: decreasing partitions is prohibited. | AlterTopicRequest |
InvalidArgumentReplicationFactor | 1004 | Either replication-factor field is invalid (e.g. negative), or not defined when needed. | CreateTopicRequest |
InvalidArgumentReplicaAssignment | 1005 | Either replication-factor field is invalid (e.g. contains duplicates), or not defined when needed. |
|
InvalidTopicConfig | 1006 | Either topic-level config setting or value is incorrect. | CreateTopicRequest , AlterTopicRequest |
PreferredReplicaLeaderElectionInProgress | 1007 | Preferred replica leader election procedure has been already started. | PreferredReplicaLeaderElectionRequest |
InvalidArgumentPreferredReplicaElectionData | 1008 | Preferred replica leader election data is in invalid (bad json, duplicates etc). | PreferredReplicaLeaderElectionRequest |
ReassignPartitionsInProgress | 1009 | Reassign partitions procedure has been already started. | ReassignPartitionsRequest |
Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.
Topic Admin Schema
The idea is to introduce Wire protocol messages that cover all topic commands (create, alter, delete, list, describe). The motivation behind the proposed schema is the following:
1) Topic commands must inherit options from TopicCommand tool
2) If some of the options are not used in particular command (e.g. ReplicaAssignment
in CreateTopicRequest
) - the special marker value is used instead (e.g. in case of ReplicaAssignment
- empty string)
3) Topic commands must support batching and provide command execution result per-topic
4) Topic commands are asynchronous - the request to create/alter/delete just initiates the corresponding commands and returns immediately
Create Topic Request
CreateTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [ConfigEntry]] TopicName => string Partitions => int32 Replicas => int32 ReplicaAssignment => [PartitionId [ReplicaId]]
ConfigKey => string ConfigValue => string |
CreateTopicRequest
requires topic name and either (Partitions
+Replicas
) or ReplicasAssignment
to create a topic. A special value -1
should be used to denote an empty value for Partitions
and Replicas
. Also user will be able to specify topic-level configs for the created topic (to use default an empty array should be provided).The (Partitions, Replicas)/ReplicaAssignment
semantics is the following:
ReplicaAssignment
is specified - Partitions
and Replicas
are not taken into account, topic is created with provided replica assignment and number of topics and replication factor are defined from ReplicaAssignment
ReplicaAssignment
is empty - number of topic partitions and replication factor must be defined with Partitions
and Replicas
, the replica assignment for topic is automatically generated on serverCreate Topic Response
CreateTopicResponse => [TopicName ErrorCode] ErrorCode => int16 TopicName => string |
CreateTopicResponse
contains a map between topic and topic creation result error code (see New Protocol Errors).
Alter Topic Request
AlterTopicRequest => [TopicName Partitions ReplicaAssignment [AddedConfigEntry] [DeletedConfig]] TopicName => string Partitions => int32 ReplicaAssignment => [PartitionId [ReplicaId]] AddedConfigEntry => ConfigKey ConfigValue ConfigKey => string ConfigValue => string Deleted Config => string |
AlterTopicRequest
is similar to previous, to specify topic level settings that should be removed, use DeletedConfig
array (just setting keys). User can provide new partitions value, replica assignment or both.AlterTopicRequest
contains an optional field Partitions
. A special value -1
should be used to denote an empty value. The Partitions/ReplicaAssignment
semantics is the following:
1) Partitions
is used only to increase number of topic partitions
2) If Partitions
value is empty (-1
) ReplicaAssignment
is not taken into account, topic partitions are not increased
3) If Partitions
doesn't increase existing number of partitions an error code DecreasePartitionsNotAllowed is returned
3) If Partitions
value is not empty and increases number of existing partitions, a new replica assignment for topic partitions is either automatically generated or defined by ReplicaAssignment
(if nonempty)
Alter Topic Response
[TopicName ErrorCode]ErrorCode => int16 TopicName => string
|
AlterTopicResponse
is similar to CreateTopicResponse
.Delete Topic Request
DeleteTopicRequest => [TopicName] TopicName => string |
DeleteTopicRequest
requires only topic names which should be deleted.
Delete Topic Response
ErrorCode => int16 TopicName => string
|
DeleteTopicResponse
is similar to CreateTopicResponse
.
Topic Metadata Request V1
TopicMetadataRequest-V1
is an evolved version of TopicMetadataRequest. This request is intended to support two admin operations - describe topic information, and check whether some particular admin command (which are designed as asynchronous) has been completed. The new version of TopicMetadataResponse will include in addition topic level configuration for each topic and replica fetch lag per partition - how far partition replica is behind from the leader broker.
TopicMetadataReqeust_V1 => [ TopicName]TopicName => string |
TopicMetadataReqeust_V1
requires only topic names. As with the first version, an empty topic name set results in returning information for all existing topics.Topic Metadata Response V1
TopicMetadataResponse_V1 => [Broker][TopicMetadata] Broker => NodeId Host Port (any number of brokers may be returned) NodeId => int32 Host => string Port => int32 TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] [ConfigEntry] TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader ReplicasLag Isr PartitionErrorCode => int16 PartitionId => int32 Leader => int32 ReplicasLag => [int32 int32] Isr => [int32] |
TopicMetadataResponse_V1
besides errorCode which is used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription
structure per topic. See table below for details:
Field | Description |
---|---|
TopicConfigDetails | A structure that holds basic replication details. |
ConfigEntry | Topic-level setting and value which was overridden. |
TopicPartitionDetails | List describing replication details for each partition. |
PartitionId | Id of the partition. |
Leader | Broker-leader id for the described partition (or -1 if not defined). |
ReplicasLag | List of broker ids serving a replica's role for the partition and fetch lag for the replica. |
ISR | Same as replicas but includes only brokers that are known to be "in-sync" |
In case of error TopicDescription
field will be returned in response with default values.
Replication Commands Schema
Preferred Replica Leader Election Request
PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]] Topic => string PartitionId => int32 |
PreferredReplicaLeaderEleactionRequest
initiates preferred replica leader election procedure, similar to ReassignPartitionsRequest
this request in intended to be non-blocking. The schema consist of one field - array of partitions for which preferred replica leader should be elected.To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.
Preferred Replica Leader Election Response
PreferredReplicaLeaderElectionResponse => [Topic ErrorCode] Topic => string ErrorCode => int16 |
PreferredReplicaLeaderElectionResponse
is similar to ReassignPartitionsResponse
.
Status of the procedure may be checked with TopicMetadataRequest
- the head of replicas
list field and leader
broker should be the same.
2. Server-side Admin Request handlers
All incoming request will be handled by specific helper classes called from KafkaApis
- TopicCommandHelper
for topic admin commands, ReassignPartitionsCommandHelper
and PreferredReplicaLeaderElectionCommandHelper
.
All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands) most likely the logic from those classes will be extracted and copied to separate classes (as proposed - TopicCommandHelper
etc).
3. Admin Client
This component is intended to be a Kafka out-of-box client implementation for Admin commands.
Admin client will use Kafka NetworkClient
facility from /clients
for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).
Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.
Proposed API:
4. Interactive Shell / CLI tool
This component wraps AdminClient
and provide an interactive shell-like environment for executing administrative commands. The goal of these changes is let people use existing standalone tools but from a single script, optionally running commands in shell, so commands arguments/names are not changed comparing to existing tools, where possible.
The tool supports two modes:
- command line interface
- interactive shell mode
Command Line Interface
This mode lets user run commands from shell script. List of available commands:
(Note: not all possible options are listed - e.g. alter topic's config)
# Topic Commands - options are ported from TopicCommand.scala bin/kafka.sh --create-topic --topic my_topic --partitions 5 --replication-factor 3 --config key=value --broker-list <host : port> bin/kafka.sh --alter-topic --topic my_topic --partitions 10 --broker-list <host : port> bin/kafka.sh --delete-topic --topic my_topic --broker-list <host : port> bin/kafka.sh --list-topics --broker-list <host : port> bin/kafka.sh --describe-topic --topic my_topic --broker-list <host : port> # Reassign Partitions - options are ported from ReassignPartitionsCommand.scala bin/kafka.sh --reassign-partitions --reassignment-json-file /user/file.json --blocking --broker-list <host : port> # Preferred Replica Leader Elections - options are ported from PreferredReplicaLeaderElectionCommand.scala bin/kafka.sh --preferred-replica-leader-election --preferred-replica-leader-election /user/file.json --blocking --broker-list <host : port> # Start kafka.sh in shell mode |
User will have to supply all commands with --broker-list <host : port>
to define at least one broker from the target cluster.
Interactive Shell Mode
Interactive shell mode provides extended facilities for admin commands execution. Command names and options are the same but user will have to define --broker-list
only once - CLI tool in interactive mode will manage cluster metadata cache and send commands to proper broker.
Also this mode provides facilities to switch context, so e.g. all topic commands are applied to switched topic - no need to specify topic-name for each topic command.
Proposed use-case is the following:
# Start kafka.sh in shell mode bin/kafka.sh --shell --broker-list <host1 : port1> Connected to Kafka Controller at <host2 : port2>.
kafka> create-topic --topic my_topic --partitions 5 --replication-factor 3 Topic "my_topic" is created. kafka> alter-topic --topic my_topic --partitions 10 Topic "my_topic" is changed. # Switch topic context kafka> topic my_topic Switched to "my_topic" topic. # Execute topic command for switched topic kafka my_topic> describe-topic "my-topic" details: Topic: my_topic Partitions: 10 ... # Switch off topic context kafka my_topic> topic kafka> |
The current tools will still be available in the 0.8.3 release but will be removed for 0.9 (tracked here https://issues.apache.org/jira/browse/KAFKA-1776 ).
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.