The goals behind the command line shell are fundamentally to provide a centralized management for Kafka operations.
Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Folks have created dozens of different systems to work with Kafka. If we can provide a wire protocol that allows the brokers to execute administrative code then clients can more simply execute commands. We have a lot of ties with zookeeper making it near impossible for some code to-do nothing better/more than shell script wrapping kafka-topics.sh etc. With the wire protocol we should be able to have client in any language work with administrating Kafka. If someone wants a REST interface (for example) then can write that in whatever language they like. We should have a client from the project that is not only an example but a fully functionality replacement for the kafka-topics, reassign-partitions, consumer offset tools.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Command line tools and arguments
Proposed Changes
Proposed changes include 4 parts:
- Wire Protocol extensions - to add new Admin messages
- Server-side Admin commands handlers (TopicCommand-like)
- Admin Client - an out-of-box client for performing administrative commands
Interactive Shell / CLI tool supporting administrative commands
Some open questions and items under discussion are marked with [x]. Please find Open Questions section for more details.
1. Wire Protocol Extensions
Overview
For each type of Admin Request a separate type of Wire protocol message is created.
Currently there are 3 types of requests:
- Topic commands which include
CreateTopic(Request | Response)
,AlterTopic
,DeleteTopic
,DescribeTopic
,ListTopics.
- Replication tools -
ReassingPartition
,VerifyReassingPartitions
;PreferredReplicaLeaderElection
- A special type of request to support Admin commands -
ClusterMetadata [1]
Please find details under specific RQ/RP schema proposal.
Schema
The same notation as in A Guide To The Kafka Protocol is used here. The only difference - new Kafka Protocol metatype - MaybeOf
("
?"
in notation), when used means value is optional in message. To define value existence special control byte is prepended before each value (0
- field is absent, otherwise - read value normally) [2].
All admin messages listed below are required to be sent only to Controller broker. Only controller will process such messages. If Admin message is sent to an ordinary broker a special error code is returned (code 22
). In case of other failure during processing message AdminRequestFailedError
is returned [3].
Error | Code | Description |
---|---|---|
| 21 | Unexpected error occurred while processing Admin request. |
NotControllerForAdminRequest | 22 | Target broker (id=<this_broker_id>) is not serving a controller's role. |
ClusterMetadata Schema
Cluster Metadata Request
ClusterMetadataRequest => |
Cluster Metadata Response
ClusterMetadataResponse => ErrorCode [Broker] ?(Controller)
Broker => NodeId Host Port NodeId => int32
Port => int32 Controller => Broker |
ClusteMetadataRequest
is a request with no arguments.
ClusterMetadataResponse
holds error code (0
in case of successful result), list of brokers in cluster and optionally broker serving a Controller's role (returning empty Controller most likely means either error during request processing or cluster being in some intermediate state).
ClusterMetadataRequest
is required for admin clients to get the Kafka brokers, specifically the controller's location, as only controller may execute admin command.
Topic Admin Schema
Create Topic Request
CreateTopicRequest => TopicName ?(Partitions) ?(Replicas) ?(ReplicaAssignment) [ConfigEntry] TopicName => string Partitions => int32 Replicas => int32 ReplicaAssignment => string
ConfigKey => string ConfigValue => string |
Create Topic Response
CreateTopicResponse => ErrorCode ?(ErrorDescription) ErrorCode => int16 ErrorDescription => string |
CreateTopicRequest
requires topic name and either (partitions+replicas) or replicas assignment to create topic (validation is done on server side). You can also specify topic-level configs to create topic with (to use default set an empty array).
CreateTopicResponse
is fairly simple - you receive error code (0
as always identifies NO_ERROR
) and optionally error description. Usually it will hold the higher level exception that happened during command execution.
Alter Topic Request
AlterTopicRequest => TopicName ?(Partitions) ?(ReplicaAssignment) [AddedConfigEntry] [DeletedConfig] TopicName => string Partitions => int32 ReplicaAssignment => string AddedConfigEntry => ConfigKey ConfigValue ConfigKey => string ConfigValue => string Deleted Config => string |
Alter Topic Response
AlterTopicResponse => ErrorCode ?(ErrorDescription) ErrorCode => int16 ErrorDescription => string
|
AlterTopicRequest
is similar to previous, to specify topic level settings that should be removed, use DeletedConfig
array (just setting keys). User can provide new partitions value, replica assignment or both.AlterTopicResponse
is similar to CreateTopicResponse
.
Delete Topic Request
DeleteTopicRequest => TopicName TopicName => string |
Delete Topic Response
DeleteTopicResponse => ErrorCode ?(ErrorDescription) ErrorCode => int16 ErrorDescription => string
|
DeleteTopicRequest
requires only topic name which should be deleted.
DeleteTopicResponse
is similar to CreateTopicResponse
.
Describe Topic Request
DescribeTopicRequest => TopicNameTopicName => string |
Describe Topic Response
DescribeTopicResponse => ErrorCode ?(ErrorDescription) ?(TopicDescription) ErrorCode => int16 ErrorDescription => string TopicDescription => TopicName TopicConfigDetails [TopicPartitionDetails] TopicName => string TopicConfigDetails => Partitions ReplicationFactor [ConfigEntry] Partitions => int32 ReplicationFactor => int32 ConfigEntry => string string TopicPartitionsDetails => PartitionId ?(Leader) [Replica] [ISR] PartitionId => int32 Leader => int32 Replica => int32 ISR => int32 |
DescribeTopicRequest
requires only topic name.DescribeTopicResponse
besides errorCode and optional errorDescription which are used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription
structure. See table below for details:
Field | Description |
---|---|
TopicName | The name of the topic for which description is provided. |
TopicConfigDetails | A structure that holds basic replication details. |
Partitions | Number of partitions in give topic. |
Config | Topic-level setting and value which was overridden. |
TopicPartitionDetails | List describing replication details for each partition. |
PartitionId | Id of the partition. |
Leader | Optional broekr-leader id for the described partition. |
Replicas | List of broker ids serving a replica's role for the partition. |
ISR | Same as replicas but includes only brokers that are known to be "in-sync" |
List Topics Request
ListTopicsRequest => |
List Topics Response
ListTopicsResponse => ErrorCode ?(ErrorDescription) ?(TopicsList) ErrorCode => int16 ErrorDescription => string TopicsList => [TopicName] TopicName => string |
ListTopicsRequest
is a request with no arguments.ListTopicsResponse
besides errorCode and optional errorDescription which are used in the same way as in previous messages, holds a list of topics in Kafka cluster.
Replication Commands Schema
Reassign Partitions
Reassign Partitions Request
ReassignPartitionRequest => ManualAssignment ManualAssignment => string |
Reassign Partitions Response
ReassignPartitionResponse => ErrorCode ?(ErrorDescription) ErrorCode => int16 ErrorDescription => string
|
ReassignPartitionsRequest
requires manual partition assignment string. Parsing / validation is done on server. This request will only initiate partition reassignment and return immediately. It is client's responsibility to block the user continually sending VerifyReassignPartitionsRequest
to check reassignment status. The format is the following:
{
"partitions": [
{"topic": "foo",
"partition": 1,
"replicas": [1,2,3] }
],
"version":1
}
ReassignPartitionResponse
is similar to CreateTopicResponse
.
Verify Reassign Partitions Request
VerifyReassignPartitionRequest => ManualAssignment ManualAssignment => string |
Verify Reassign Partitions Response
VerifyReassignPartitionResponse => [ReasignmnetResult] ErrorCode ?(ErrorDescription) ReasignmnetResult => TopicAndPartition ResultCode TopicAndPartition => string int32 ResultCode => int16 ErrorCode => int16 ErrorDescription => string |
VerifyReassignPartitionsRequest
requires manual partition assignment string as with ReassignPartitionsRequest
which status is verified by this request.
VerifyReassignPartitionResponse
as with other Admin request may return error code and optional error description in case of failure. Otherwise a reassignment result map is returned. It holds reassignment status (-1
- reassignment failed, 0
- in progress, 1
- completed successfully).
Preferred Replica Leader Election
Preferred Replica Leader Election Request
PreferredReplicaLeaderElectionRequest => PartitionsSerialized PartitionsSerialized => string |
Preferred Replica Leader Election Response
PreferredReplicaLeaderElectionResponse => ErrorCode ?(ErrorDescription) ErrorCode => int16 ErrorDescription => string
|
PreferredReplicaLeaderEleactionRequest
initiates preferred replica leader election procedure, similar to ReassignPartitionsRequest
this request in intended to be non-blocking. The schema consist of one optional field - partitions in serialized form (json) for which procedure should be started. The format is the following:{"partitions":
[
{"topic": "foo", "partition": 1},
{"topic": "foobar", "partition": 2}
]
}
PreferredReplicaLeaderElectionResponse
is similar to CreateTopicResponse
.
Status of the procedure may be checked with DescribeTopicRequest
- the head of replicas
list field and leader
broker should be the same.
2. Server-side Admin Request handlers
All incoming request will be handled by specific helper classes called from KafkaApis
- TopicCommandHelper
for topic admin commands, ReassignPartitionsCommandHelper
and PreferredReplicaLeaderElectionCommandHelper
.
All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands) most likely the logic from those classes will be extracted and copied to separate classes (as proposed - TopicCommandHelper[4]
etc).
3. Admin Client
This component is intended to be a Kafka out-of-box client implementation for Admin commands.
Admin client will use Kafka NetworkClient
facility from /clients
for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).
Proposed API [5]:
4. Interactive Shell / CLI tool
This component wraps AdminClient
and provide an interactive shell-like environment for executing administrative commands. The goal of these changes is let people use existing standalone tools but from a single script, optionally running commands in shell, so commands arguments/names are not changed comparing to existing tools, where possible.
The tool supports two modes:
- command line interface
- interactive shell mode
Command Line Interface
This mode lets user run commands from shell script. List of available commands:
(Note: not all possible options are listed - e.g. alter topic's config)
# Topic Commands - options are ported from TopicCommand.scala bin/kafka.sh --create-topic --topic my_topic --partitions 5 --replication-factor 3 --config key=value --broker-list <host : port> bin/kafka.sh --alter-topic --topic my_topic --partitions 10 --broker-list <host : port> bin/kafka.sh --delete-topic --topic my_topic --broker-list <host : port> bin/kafka.sh --list-topics --broker-list <host : port> bin/kafka.sh --describe-topic --topic my_topic --broker-list <host : port> # Reassign Partitions - options are ported from ReassignPartitionsCommand.scala bin/kafka.sh --reassign-partitions --reassignment-json-file /user/file.json --blocking --broker-list <host : port> # Preferred Replica Leader Elections - options are ported from PreferredReplicaLeaderElectionCommand.scala bin/kafka.sh --preferred-replica-leader-election --preferred-replica-leader-election /user/file.json --blocking --broker-list <host : port> # Start kafka.sh in shell mode |
User will have to supply all commands with --broker-list <host : port>
to define at least one broker from the target cluster.
Interactive Shell Mode
Interactive shell mode provides extended facilities for admin commands execution. Command names and options are the same but user will have to define --broker-list
only once - CLI tool in interactive mode will manage cluster metadata cache and send commands to proper broker.
Also this mode provides facilities to switch context, so e.g. all topic commands are applied to switched topic - no need to specify topic-name for each topic command.
Proposed use-case is the following:
# Start kafka.sh in shell mode bin/kafka.sh --shell --broker-list <host1 : port1> Connected to Kafka Controller at <host2 : port2>.
kafka> create-topic --topic my_topic --partitions 5 --replication-factor 3 Topic "my_topic" is created. kafka> alter-topic --topic my_topic --partitions 10 Topic "my_topic" is changed. # Switch topic context kafka> topic my_topic Switched to "my_topic" topic. # Execute topic command for switched topic kafka my_topic> describe-topic "my-topic" details: Topic: my_topic Partitions: 10 ... # Switch off topic context kafka my_topic> topic kafka> |
Open questions:
ClusterMetadata
duplicatesTopicMetadata
- we can extendTopicMetadata
with controllerId information and probably something else. Other alternative - is a generic server-side re-routing facility (see KAFKA-1912 for details).- People expressed concerns about optional
MaybeOf
type because it is inconsistent with the way we currently handle empty values in the Wire Protocol. - We might extend error codes to fulfill all possible failures and give up using
outcome
/errorDescription
field as a generic result description. - It is proposed to create a separate ticket to rework topic command to execute commands directly by the controller instead of using zookeeper admin path to notify controller about the change.
AdminClient
may need to support batching admin operations. It is considered whether we can cover it with allowing user to supply a regexp for topic name inAlterTopic
,DeleteTopic
,DescribeTopic
requests (similarly toTopicCommand.scala
)
Compatibility, Deprecation, and Migration Plan
The current tools will still be available in the 0.8.3 release but will be removed for 0.9 (tracked here https://issues.apache.org/jira/browse/KAFKA-1776 ).
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.