The goals behind the command line shell are fundamentally to provide a centralized management for Kafka operations.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Folks have created dozens of different systems to work with Kafka. If we can provide a wire protocol that allows the brokers to execute administrative code then clients can more simply execute commands. We have a lot of ties with zookeeper making it near impossible for some code to-do nothing better/more than shell script wrapping kafka-topics.sh etc. With the wire protocol we should be able to have client in any language work with administrating Kafka. If someone wants a REST interface (for example) then can write that in whatever language they like. We should have a client from the project that is not only an example but a fully functionality replacement for the kafka-topics, reassign-partitions, consumer offset tools.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Command line tools and arguments
Proposed Changes
Proposed changes include 4 parts:
- Wire Protocol extensions - to add new Admin messages
- Server-side Admin commands handlers (TopicCommand-like)
- Admin Client - an out-of-box client for performing administrative commands
Interactive Shell / CLI tool supporting administrative commands
Some open questions and items under discussion are marked with {color:red}*{color}. Please find Open Questions section for more details.
1. Wire Protocol Extensions
Overview
For each type of Admin Request a separate type of Wire protocol message is created.
Currently there are 3 types of requests:
- Topic commands which include
CreateTopic(Request | Response)
,AlterTopic
,DeleteTopic
,DescribeTopic
,ListTopics.
- Replication tools -
ReassingPartition
,VerifyReassingPartitions
;PreferredReplicaLeaderElection
,VerifyPreferredReplicaLeaderElection
- A special type of request to support Admin commands -
ClusterMetadata
Please find details under specific RQ/RP schema proposal.
Schema
The same notation as in A Guide To The Kafka Protocol is used here. The only difference - new Kafka Protocol metatype - MaybeOf
("
?"
in notation), when used means value is optional in message. To define value existence special control byte is prepended before each value (0
- field is absent, otherwise - read value normally).
Cluster Metadata Request
ClusterMetadataRequest => |
Cluster Metadata Response
ClusterMEtadataResponse => ErrorCode [Broker] ?(Controller) ErrorCode => int16 Broker => NodeId Host Port NodeId => int32 Host => string Port => int32 Controller => Broker
|
ClusteMetadataRequest
is a request with no arguments.
ClusterMetadataResponse
holds error code (0
in case of successful result), list of brokers in cluster and optionally broker serving a Controller's role (returning empty Controller most likely means either error during request processing or cluster being in some intermediate state).
Admin RQ/RP format
All admin messages listed below are required to be sent only to Controller broker. Only controller will process such messages. If Admin message is sent to an ordinary broker a special error code is returned (code 22
). In case of other failure during processing message AdminRequestFailedError
is returned.
Error | Code | Description |
---|---|---|
| 21 | Unexpected error occurred while processing Admin request. |
InvalidRequestTarget | 22 | Target broker (id=<this_broker_id>) is not serving a controller's role. |
Create Topic Request
CreateTopicRequest => TopicName ?(Partitions) ?(Replicas) ?(ReplicaAssignment) [Config] TopicName => string Partitions => int32 Replicas => int32 ReplicaAssignment => string |
Create Topic Response
CreateTopicResponse => ErrorCode ?(ErrorDescription) ErrorCode => int16 ErrorDescription => string |
CreateTopicRequest
requires topic name and either (partitions+replicas) or replicas assignment to create topic (validation is done on server side). You can also specify topic-level configs to create topic with (to use default set an empty array), format key=value
.
CreateTopicResponse
is fairly simple - you receive error code (0
as always identifies NO_ERROR
) and optionally error description. Usually it will hold the higher level exception that happened during command execution.
Alter Topic Request
AlterTopicRequest => TopicName ?(Partitions) ?(ReplicaAssignment) [AddedConfig] [DeletedConfig] TopicName => string Partitions => int32 Replicas => int32
Deleted Config => string |
Alter Topic Response
AlterTopicResponse => ErrorCode ?(ErrorDescription) ErrorCode => int16 ErrorDescription => string
|
AlterTopicRequest
is similar to previous, to specify topic level settings that should be removed, use DeletedConfig
array (just setting keys).AlterTopicResponse
is similar to CreateTopicResponse
.
Delete Topic Request
DeleteTopicRequest => TopicName TopicName => string |
Delete Topic Response
DeleteTopicResponse => ErrorCode ?(ErrorDescription) ErrorCode => int16 ErrorDescription => string
|
DeleteTopicRequest
requires only topic name which should be deleted.
DeleteTopicResponse
is similar to CreateTopicResponse
.
Describe Topic Request
DescribeTopicRequest => TopicName TopicName => string |
Describe Topic Response
DescribeTopicResponse => ErrorCode ?(ErrorDescription) ?(TopicDescription) ErrorCode => int16 ErrorDescription => string TopicDescription => TopicName TopicConfigDetails [TopicPartitionDetails] TopicName => string TopicConfigDetails => Partitions ReplicationFactor [Config] Partitions => int32 ReplicationFactor => int32 Config => overridden topic-level configs TopicPartitionsDetails => PartitionId ?(Leader) [Replica] [ISR] PartitionId => int32 Leader => int32 Replica => int32 ISR => int32 |
DescribeTopicRequest
requires only topic name.DescribeTopicResponse
besides errorCode and optional errorDescription which are used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription
structure. Its structure is the following:
Field | Description |
---|---|
TopicName | The name of the topic for which description is provided. |
TopicConfigDetails | A structure that holds basic replication details. |
Partitions | Number of partitions in give topic. |
Config | Topic-level setting and value which was overridden. |
TopicPartitionDetails | List describing replication details for each partition. |
PartitionId | Id of the partition. |
Leader | Optional broekr-leader id for the described partition. |
Replicas | List of broker ids serving a replica's role for the partition. |
ISR | Same as replicas but includes only brokers that are known to be "in-sync" |
List Topics Request
ListTopicsRequest => |
List Topics Response
ListTopicsResponse => ErrorCode ?(ErrorDescription) ?(TopicsList) ErrorCode => int16 ErrorDescription => string TopicsList => [TopicMarkedForDeletion] [AliveTopic] TopicMarkedForDeletion => string AliveTopic => string |
ListTopicsRequest
is a request with no arguments.ListTopicsResponse
besides errorCode and optional errorDescription which are used in the same way as in previous messages, holds optional (non empty if execution was successful) two list of topic names - one for deleted topics (marked for deletion) and the second one for ordinary, alive topics.
Installation
This is an instruction how to build and start Kafka Command Line Tool (hereinafter - Shell). The implementation is in progress under KAFKA-1694.
To start Shell you need to have a running Kafka Cluster built from the given patch (attached under KAFKA-1694) and build the Shell itself.
Get the code.
Get the KAFKA-1772_1802_1775_1774_v2.patch attached to KAFKA-1694.
The patch was built against trunk, on top of revision 7e9368b . So reset to this commit and then run to apply the patch:git am KAFKA-1772_1802_1775_1774_v2.patch
Build the code. Run:
./gradlew releaseTarGz_2_10_4
- Start somewhere Kafka Cluster from archive under
./core/build/distributions/kafka_2.10-0.8.3-SNAPSHOT.tgz
- Unpack build archive and start Shell:
#cd <kafka_home>/core/build/distributions/ && rm -rf kafka_2.10-0.8.3-SNAPSHOT && tar -xf kafka_2.10-0.8.3-SNAPSHOT.tgz
- Start the Shell:
sudo <kafka_home>/core/build/distributions/kafka_2.10-0.8.3-SNAPSHOT/bin/kafka.sh --shell --broker <host : port>
Where <host : port> is location of one of the running brokers from the Cluster. - To get Shell help run:
sudo <kafka_home>/core/build/distributions/kafka_2.10-0.8.3-SNAPSHOT/bin/kafka.sh --help
Sample usage
You can use Kafka Command Line Tool in two ways: 1) as a interactive shell 2) as a simple CLI.
E.g. to get list of topics you can:
1) Start Shell and run:
sudo bin/kafka.sh --shell --broker <host : port>
kafka> list-topics
Or
2) Run right from kafka.sh:
sudo bin/kafka.sh --list-topics --broker <host : port>
Open questions:
Compatibility, Deprecation, and Migration Plan
- When will we remove the existing behavior?
I don't know if that has to be decided now. Folks have already built wrapper tools, they can still keep using them if they want. We should code freeze them though.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.