The goals behind the command line shell are fundamentally to provide a centralized management for Kafka operations.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Folks have created dozens of different systems to work with Kafka. If we can provide a wire protocol that allows the brokers to execute administrative code then clients can more simply execute commands. We have a lot of ties with zookeeper making it near impossible for some code to-do nothing better/more than shell script wrapping kafka-topics.sh etc. With the wire protocol we should be able to have client in any language work with administrating Kafka. If someone wants a REST interface (for example) then can write that in whatever language they like. We should have a client from the project that is not only an example but a fully functionality replacement for the kafka-topics, reassign-partitions, consumer offset tools.
Public Interfaces
- Changes to Wire Protocol:
Extending protocol with these messages:
- CreateTopic(Request | Response), AlterTopic, DeleteTopic, ListTopics
- PreferredLeaderReplicaElection
- Evolving TopicMetadataRequest to V1
- New client: AdminClient - a Wire Protocol client for administrative operations
- New CLI tool: Kafka Shell - a single unified command line interface for administrative operations
Proposed Changes
Proposed changes include 4 parts:
- Wire Protocol extensions - to add new Admin messages
- Server-side Admin commands handlers (TopicCommand-like)
- Admin Client - an out-of-box client for performing administrative commands
Interactive Shell / CLI tool supporting administrative commands
(You can try out some of the changes from this KIP. Please follow this link to get the code and start the CLI tool).
1. Wire Protocol Extensions
Overview
It is proposed to add / modify these 3 types of requests:
- Topic commands which include
CreateTopic(Request | Response)
,AlterTopic
,DeleteTopic
- Replication tools -
PreferredReplicaLeaderElection
- Extend
TopicMetadataRequest
to include topic configuration, and partition fetch lag per replica
Please find details under specific RQ/RP schema proposal.
Schema
Command | Wire Protocol Message | Note |
---|---|---|
create-topic | CreateTopicRequest | |
alter-topic | AlterTopicRequest | |
delete-topic | DeleteTopicRequest | |
describe-topic | TopicMetadataRequest_V1 | Using new version on TopicMetadataRequest which will include topic-level config |
list-topics | TopicMetadataRequest | Using an empty list as input for request, which results in returning metadata for all existing topcis |
reassign-partitions | AlterTopicReqeust | Using batch AlterTopicRequest with replica assignment specified |
preferred-replica-leader-election | PreferredReplicaLeaderElectionRequest |
It is also important that all Admin requests are intended to be asynchronous. This means requests will only initiate particular command, and will not wait until the command is actually completed. There are different reasons to that, but we wan't to make sure that we give users all needed tools to verify whether issued command has been completed. E.g. one can consider topic is created once it is possible to consumer from / produce to this topic. In this case user can leverage TopicMetadataRequest to check all brokers received metadata about the newly created topic.
Finally, Topic Admin schema requests are likely to be used not only in CLI tool, where the common use case is create/change/delete a single entity. Since Kafka is able to maintain a huge number of topics it is important that user can efficiently issue many requests at one time. That's why all Topic Admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network calls.
New Protocol Errors
It is proposed to add these error codes to the protocol.
Error | Description | Requests |
---|---|---|
TopicAlreadyExists | Topic with this name already exists. | CreateTopicRequest |
InvalidArgumentTopicName | Topic name contains invalid characters. | CreateTopicRequest |
InvalidArgumentPartitions | Either partition field is invalid (e.g. negative) | CreateTopicRequest , AlterTopicRequest |
InvalidArgumentReplicationFactor | Either replication-factor field is invalid (e.g. negative) | CreateTopicRequest, |
InvalidArgumentReplicaAssignment | Either replication assignment field is invalid (e.g. contains duplicates) |
|
InvalidArgumentTopicConfig | Either topic-level config setting or value is incorrect. | CreateTopicRequest , AlterTopicRequest |
DecreasePartitionsNotAllowed | Invalid partitions argument: decreasing partitions is prohibited when altering topic. | AlterTopicRequest |
PreferredReplicaLeaderElectionInProgress | Preferred replica leader election procedure has been already started. | PreferredReplicaLeaderElectionRequest |
ReassignPartitionsInProgress | Reassign partitions procedure has been already started. | AlterTopicRequest |
MultipleInstructionsForOneTopic | Only one mutation is allowed at once: e.g. change topic replication factor or change topic config. | CreateTopic, AlterTopicRequest |
MultipleTopicInstructionsInOneBatch | Multiple topic instructions for the same topic in one batch request | CreateTopicRequest, AlterTopicRequest, DeleteTopicRequest |
Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.
The same notation as in A Guide To The Kafka Protocol is used here.
Topic Admin Schema
Create Topic Request
CreateTopicRequest => [TopicName Partitions Replicas ReplicaAssignment] TopicName => string Partitions => int32 Replicas => int32 ReplicaAssignment => [PartitionId [ReplicaId]] |
Partitions
+ Replicas
), ReplicaAssignment
in one instruction. (Note: there is a special use case - automatic topic creation for TopicMetadataRequest
, to trigger it user should set client_id=consumer and define only topic name). Otherwise, MultipleInstructionsForOneTopic
is returned.ReplicaAssignment
is defined number of partitions and replicas will be calculated from the supplied ReplicaAssignment
. In case of defined (Partitions
+ Replicas
) replica assignment will be automatically generated by the server.CreateTopicRequest
may include only one topic creation command for the topic with the given name in one batch, otherwise MultipleTopicInstructionsInOneBatch
is returned.Create Topic Response
CreateTopicResponse => [TopicName ErrorCode] ErrorCode => int16 TopicName => string |
CreateTopicResponse
contains a map between topic and topic creation result error code (see New Protocol Errors).
Alter Topic Request
AlterTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [AddedConfigEntry] [DeletedConfig]] TopicName => string Replicas => int32 Partitions => int32 ReplicaAssignment => [PartitionId [ReplicaId]] AddedConfigEntry => ConfigKey ConfigValue ConfigKey => string ConfigValue => string Deleted Config => string |
User can define only one from Partitions,
Replicas
, ReplicaAssignment
, AddedConfigEntry
, DeletedConfig
. Otherwise, MultipleInstructionsForOneTopic
is returned.
One AlterTopicRequest
may include only one topic alteration command for the topic with the given name in one batch, otherwise MultipleTopicInstructionsInOneBatch
is returned.
Alter Topic Response
[TopicName ErrorCode]ErrorCode => int16 TopicName => string
|
AlterTopicResponse
is similar to CreateTopicResponse
.Delete Topic Request
DeleteTopicRequest => [TopicName] TopicName => string |
DeleteTopicRequest
requires only topic names which should be deleted.
DeleteTopicRequest
may include only one topic deletion command for the topic with the given name in one batch, otherwise MultipleTopicInstructionsInOneBatch
is returned.Delete Topic Response
ErrorCode => int16 TopicName => string
|
DeleteTopicResponse
is similar to CreateTopicResponse
.
Topic Metadata Request V1
TopicMetadataReqeust_V1 => [ TopicName]TopicName => string |
TopicMetadataRequest_V1
is an evolved version of TopicMetadataRequest
. This request is intended to support two admin operations - get topic metadata, and check whether some particular admin command (which are all asynchronous) has been completed.TopicMetadataRequest_V1
requires only topic names. As with the first version, an empty topic name set results in returning information for all existing topics.TopicMetadataRequest_V1
comparing to the previous version won't trigger topic creation automatically if the topic with the given name doesn't exist.Topic Metadata Response V1
TopicMetadataResponse_V1 => [Broker][TopicMetadata] Broker => NodeId Host Port (any number of brokers may be returned) NodeId => int32 Host => string Port => int32 TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] [ConfigEntry] TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader ReplicasLag Isr PartitionErrorCode => int16 PartitionId => int32 Leader => int32 ReplicasLag => [int32 int32] Isr => [int32] ConfigEntry => string string |
The new version of TopicMetadataResponse
in addition to TopicMetadataResponse_V0
will include topic level configuration for each topic and replica fetch lag per partition - how far partition replica is behind from the leader replica.
Replication Commands Schema
Preferred Replica Leader Election Request
PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]] Topic => string PartitionId => int32 |
PreferredReplicaLeaderEleactionRequest
initiates preferred replica leader election procedure. Similar to Topic Admin requests this request in intended to be non-blocking. The schema consist of one field - array of partitions for which preferred replica leader should be elected.To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.
Preferred Replica Leader Election Response
PreferredReplicaLeaderElectionResponse => [Topic ErrorCode] Topic => string ErrorCode => int16 |
PreferredReplicaLeaderElectionResponse
is similar to ReassignPartitionsResponse
.
Status of the procedure may be checked with TopicMetadataRequest
- the head of replicas
list field and leader
broker should be the same.
2. Server-side Admin Request handlers
All incoming request will be handled by specific helper classes called from KafkaApis
- TopicCommandHelper
for topic admin commands, ReassignPartitionsCommandHelper
and PreferredReplicaLeaderElectionCommandHelper
.
All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands) most likely the logic from those classes will be extracted and copied to separate classes (as proposed - TopicCommandHelper
etc).
3. Admin Client
This component is intended to be a Kafka out-of-box client implementation for Admin commands.
Admin client will use Kafka NetworkClient
facility from /clients
for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).
Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.
Proposed API:
4. Interactive Shell / CLI tool
This component wraps AdminClient
and provide an interactive shell-like environment for executing administrative commands. The goal of these changes is let people use existing standalone tools but from a single script, optionally running commands in shell, so commands arguments/names are not changed comparing to existing tools, where possible.
The tool supports two modes:
- command line interface
- interactive shell mode
Command Line Interface
This mode lets user run commands from shell script. List of available commands:
(Note: not all possible options are listed - e.g. alter topic's config)
# Topic Commands - options are ported from TopicCommand.scala bin/kafka.sh --create-topic --topic my_topic --partitions 5 --replication-factor 3 --config key=value --broker-list <host : port> bin/kafka.sh --alter-topic --topic my_topic --partitions 10 --broker-list <host : port> bin/kafka.sh --delete-topic --topic my_topic --broker-list <host : port> bin/kafka.sh --list-topics --broker-list <host : port> bin/kafka.sh --describe-topic --topic my_topic --broker-list <host : port> # Reassign Partitions - options are ported from ReassignPartitionsCommand.scala bin/kafka.sh --reassign-partitions --reassignment-json-file /user/file.json --blocking --broker-list <host : port> # Preferred Replica Leader Elections - options are ported from PreferredReplicaLeaderElectionCommand.scala bin/kafka.sh --preferred-replica-leader-election --preferred-replica-leader-election /user/file.json --blocking --broker-list <host : port> # Start kafka.sh in shell mode |
User will have to supply all commands with --broker-list <host : port>
to define at least one broker from the target cluster.
Interactive Shell Mode
Interactive shell mode provides extended facilities for admin commands execution. Command names and options are the same but user will have to define --broker-list
only once - CLI tool in interactive mode will manage cluster metadata cache and send commands to proper broker.
Also this mode provides facilities to switch context, so e.g. all topic commands are applied to switched topic - no need to specify topic-name for each topic command.
Proposed use-case is the following:
# Start kafka.sh in shell mode bin/kafka.sh --shell --broker-list <host1 : port1> Connected to Kafka Controller at <host2 : port2>.
kafka> create-topic --topic my_topic --partitions 5 --replication-factor 3 Topic "my_topic" is created. kafka> alter-topic --topic my_topic --partitions 10 Topic "my_topic" is changed. # Switch topic context kafka> topic my_topic Switched to "my_topic" topic. # Execute topic command for switched topic kafka my_topic> describe-topic "my-topic" details: Topic: my_topic Partitions: 10 ... # Switch off topic context kafka my_topic> topic kafka> |
The current tools will still be available in the 0.8.3 release but will be removed for 0.9 (tracked here https://issues.apache.org/jira/browse/KAFKA-1776 ).
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.