You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 56 Next »

The goals behind the command line shell are fundamentally to provide a centralized management for Kafka operations.

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Folks have created dozens of different systems to work with Kafka. If we can provide a wire protocol that allows the brokers to execute administrative code then clients can more simply execute commands. We have a lot of ties with zookeeper making it near impossible for some code to-do nothing better/more than shell script wrapping kafka-topics.sh etc. With the wire protocol we should be able to have client in any language work with administrating Kafka. If someone wants a REST interface (for example) then can write that in whatever language they like. We should have a client from the project that is not only an example but a fully functionality replacement for the kafka-topics, reassign-partitions, consumer offset tools.

Public Interfaces

  • Changes to Wire Protocol:

 Extending protocol with these messages: 
- CreateTopic(Request | Response), AlterTopic, DeleteTopic, ListTopics, DescribeTopic
- PreferredLeaderReplicaElection
- ReassignPartitions

  • New client: AdminClient - a Wire Protocol client for administrative operations
  • New CLI tool: Kafka Shell - a single unified command line interface for administrative operations

 

Proposed Changes

Proposed changes include 4 parts:

  1. Wire Protocol extensions - to add new Admin messages
  2. Server-side Admin commands handlers (TopicCommand-like)
  3. Admin Client - an out-of-box client for performing administrative commands
  4. Interactive Shell / CLI tool supporting administrative commands

(You can try out some of the changes from this KIP. Please follow this link to get the code and start the CLI tool).

1. Wire Protocol Extensions

Overview

For each type of Admin Request a separate type of Wire protocol message is created.

It is proposed to add / modify these 3 types of requests:

  • Topic commands which include CreateTopic(Request | Response)AlterTopicDeleteTopicDescribeTopicListTopics.
  • Replication tools - ReassingPartitionPreferredReplicaLeaderElection

Please find details under specific RQ/RP schema proposal.

Schema

The same notation as in  A Guide To The Kafka Protocol is used here. 

New Protocol Errors

It is proposed to add these error codes to the protocol.

Error

Code

Description

Requests
TopicAlreadyExists1001Topic with this name already exists.CreateTopicRequest
InvalidArgumentPartitions1002Either partition field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest, AlterTopicRequest
DecreasePartitionsNotAllowed1003Invalid partitions argument: decreasing partitions is prohibited.AlterTopicRequest
InvalidArgumentReplicationFactor1004Either replication-factor field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest
InvalidArgumentReplicaAssignment1005Either replication-factor field is invalid (e.g. contains duplicates), or not defined when needed.

CreateTopicRequest, AlterTopicRequest,

ReassignPartitionsRequest

InvalidTopicConfig1006

Either topic-level config setting or value is incorrect.

CreateTopicRequest, AlterTopicRequest
PreferredReplicaLeaderElectionInProgress1007Preferred replica leader election procedure has been already started.PreferredReplicaLeaderElectionRequest
InvalidArgumentPreferredReplicaElectionData1008Preferred replica leader election data is in invalid (bad json, duplicates etc).PreferredReplicaLeaderElectionRequest
ReassignPartitionsInProgress1009Reassign partitions procedure has been already started.ReassignPartitionsRequest

Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.

Topic Admin Schema

The idea is to introduce Wire protocol messages that cover all topic commands (create, alter, delete, list, describe). The motivation behind the proposed schema is the following:

1) Topic commands must inherit options from TopicCommand tool

2) If some of the options are not used in particular command (e.g. ReplicaAssignment in CreateTopicRequest) - the special marker value is used instead (e.g. in case of ReplicaAssignment - empty string)

3) Topic commands must support batching and provide command execution result per-topic

4) Topic commands are asynchronous - the request to create/alter/delete just initiates the corresponding commands and returns immediately

Create Topic Request

 

CreateTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [ConfigEntry]]
TopicName => string
Partitions => int32
Replicas => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
ConfigEntry => ConfigKey ConfigValue
ConfigKey => string
ConfigValue => string
CreateTopicRequest requires topic name and either (Partitions+Replicas) or ReplicasAssignment to create a topic. A special value -1 should be used to denote an empty value for Partitions and ReplicasAlso user will be able to specify topic-level configs for the created topic (to use default an empty array should be provided).

The (Partitions, Replicas)/ReplicaAssignment semantics is the following:

1) If ReplicaAssignment is specified - Partitions and Replicas are not taken into account, topic is created with provided replica assignment and number of topics and replication factor are defined from ReplicaAssignment
2) If ReplicaAssignment is empty - number of topic partitions and replication factor must be defined with Partitions and Replicas, the replica assignment for topic is automatically generated on server
Create Topic Response

 

CreateTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

CreateTopicResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Alter Topic Request

 

AlterTopicRequest => [TopicName Partitions ReplicaAssignment [AddedConfigEntry] [DeletedConfig]]
TopicName => string
Partitions => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
AddedConfigEntry => ConfigKey ConfigValue
 ConfigKey => string
 ConfigValue => string
 DeletedConfig => string
AlterTopicRequest is similar to previous, to specify topic level settings that should be removed, use DeletedConfig array (just setting keys). User can provide new partitions value, replica assignment or both.

AlterTopicRequest contains an optional field Partitions. A special value -1 should be used to denote an empty value. The Partitions/ReplicaAssignment semantics is the following:

1) Partitions is used only to increase number of topic partitions

2) If Partitions value is empty (-1) ReplicaAssignment is not taken into account, topic partitions are not increased

3) If Partitions doesn't increase existing number of partitions an error code DecreasePartitionsNotAllowed is returned

3) If Partitions value is not empty and increases number of existing partitions, a new replica assignment for topic partitions is either automatically generated or defined by ReplicaAssignment (if nonempty)

Alter Topic Response

 

AlterTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

AlterTopicResponse is similar to CreateTopicResponse.
Delete Topic Request

 

DeleteTopicRequest => [TopicName]
TopicName => string
DeleteTopicRequest requires only topic names which should be deleted.
Delete Topic Response

 

DeleteTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

DeleteTopicResponse is similar to CreateTopicResponse.

Describe Topic Request

DescribeTopicRequest is intended to replace TopicMetadataRequest in future versions.

DescribeTopicRequest => [TopicName]
TopicName => string
DescribeTopicRequest requires only topic names.
Describe Topic Response

 

DescribeTopicResponse => [TopicName ErrorCode TopicDescription]
ErrorCode => int16
TopicName => string
TopicDescription => TopicConfigDetails [TopicPartitionDetails]
TopicConfigDetails => [ConfigEntry]
ConfigEntry => string string
TopicPartitionsDetails => PartitionId Leader [Replica] [ISR]
PartitionId => int32
Leader => int32
Replica => int32
ISR => int32

 

DescribeTopicResponse besides errorCode which is used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription structure per topic. See table below for details:

Field

Description

TopicConfigDetails

A structure that holds basic replication details.

ConfigEntry

Topic-level setting and value which was overridden.

TopicPartitionDetails

List describing replication details for each partition.

PartitionId

Id of the partition.

LeaderBroker-leader id for the described partition (or -1 if not defined).
ReplicasList of broker ids serving a replica's role for the partition.
ISRSame as replicas but includes only brokers that are known to be "in-sync"

In case of error TopicDescription field will be returned in response with default values.

List Topics Request

 

ListTopicsRequest =>
ListTopicsRequest is a request with no arguments.
List Topics Response

 

ListTopicsResponse => ErrorCode [TopicName]
ErrorCode => int16
TopicName => string

ListTopicsResponse besides errorCode holds a list of topics in Kafka cluster.

Replication Commands Schema

Reassign Partitions
Reassign Partitions Request

 

ReassignPartitionRequest => [Topic [PartitionId [ReplicaId]]]
Topic => string
PartitionId => int32
ReplicaId => int32

ReassignPartitionsRequest requires partition assignment - partition to array of replicas mapping. Validation for partition / replicas existence is done on server. Status of the ReassignPartitionRequest can be checked with VerifyReassignParitionRequest.

Reassign Partitions Response

 

ReassignPartitionResponse => [Topic ErrorCode]
Topic => string
ErrorCode => int32

ReassignPartitionResponse holds an error code per topic, non-empty if reassignment may not be started (e.g. due to validation error).

Reassignment status can be checked with DescribeTopicRequest field AssignedReplicas.

Preferred Replica Leader Election
Preferred Replica Leader Election Request

 

PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]]
Topic => string
PartitionId => int32
PreferredReplicaLeaderEleactionRequest initiates preferred replica leader election procedure, similar to ReassignPartitionsRequest this request in intended to be non-blocking. The schema consist of one optional field - array of partitions for which preferred replica leader should be elected.

To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.

Preferred Replica Leader Election Response

 

PreferredReplicaLeaderElectionResponse => [Topic ErrorCode]
Topic => string
ErrorCode => int16

 

PreferredReplicaLeaderElectionResponse is similar to ReassignPartitionsResponse.

Status of the procedure may be checked with DescribeTopicRequest  - the head of replicas list field and leader broker should be the same.

2. Server-side Admin Request handlers

All incoming request will be handled by specific helper classes called from KafkaApis - TopicCommandHelper for topic admin commands, ReassignPartitionsCommandHelper and PreferredReplicaLeaderElectionCommandHelper.

All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands)  most likely the logic from those classes will be extracted and copied  to separate classes (as proposed - TopicCommandHelper etc).

3. Admin Client

This component is intended to be a Kafka out-of-box client implementation for Admin commands.

Admin client will use Kafka NetworkClient facility from /clients for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).

Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.

Proposed API:

public class AdminClient {

/**
* A client is instantiated by providing a set of key-value pairs as configuration. Most
* of the settings will be related to NetworkClient
*
* @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
*/
public AdminClient(Properties properties);

/**
* Initiates topics creation.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata about newly created topics was propagated
* to all brokers
*
* @param createTopicRequestBody holder (built by means of respective Builder) of all required arguments to create topics
* @return java.util.concurrent.Future which holds topics creation result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> createTopics(CreateTopicRequestBody createTopicRequestBody) throws ApiException;

/**
* Initiates topics alteration.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored/changed respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that updated metadata about altered topics was propagated
* to all brokers
*
* @param alterTopicRequestBody holder (built by means of respective Builder) of all required arguments to alter topics
* @return java.util.concurrent.Future which holds topics alteration result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> alterTopics(AlterTopicRequestBody alterTopicRequestBody) throws ApiException;

/**
* Initiates topic deletion.
* This is an asynchronous call, it returns immediately once server has accepted request and marked requested topics for deletion in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata with updated topic list was propagated to
* all brokers
*
* @param topics topic names to be deleted
* @return java.util.concurrent.Future which holds topics deletion result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic deletion was not even started
*/
public Future<Map<String, Errors>> deleteTopics(List<String> topics) throws ApiException;

/**
* Lists all available topics in Kafka cluster.
* Topic is considered available if all brokers in cluster have received and cached metadata about it
*
* @return list of topic names
*
* @throws ApiException
*/
public List<String> listTopics() throws ApiException;

/**
* Checks whether topic with the given name exists
*
* @param topic name to be checked
* @return true if all brokers in cluster have received and cached metadata about it
*
* @throws ApiException
*/
public boolean topicExists(String topic) throws ApiException;

/**
* TODO: not finalized yet
* Request replication information about Kafka topics
*
* @return a mapping between topic name and topic description
* @throws ApiException in case of global error, which means topic description cannot be fetched for all topics
*/
public Map<String, DescribeTopicOutput> describeTopics(List<String> topicNames) throws ApiException;

/**
* Initiates long-running reassign partitions procedure.
* This is an asynchronous call, it returns immediately once server has accepted request, and created admin path in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure all that all partitions reassignments have completed.
* Note: currently there are only two possible states for reassigned partition: Completed, In Progress.
*
* @param reassignment schema among which replicas partitions will be reassigned
*
* @return java.util.concurrent.Future which is completed once all partitions have been reassigned
*
* @throws ApiException in case partition reassignment wasn't initiated on server
*/
public Future<Void> reassignPartitions(Map<String, Map<Integer, List<Integer>>> reassignment) throws ApiException;
/**
* Checks the interim status of the partitions reassignment.
* Reassignment for concrete partition is considered completed if partition has been removed from
* admin zookeeper path and all cluster brokers have received and cached relevant AR metadata for the
* given partition
*
* @param reassignment schema same as was used for reassign partitions request
*
* @return two maps - completed and partitions for which reassignment is still in progress
* @throws ApiException in case reassignment verification wasn't initiated on server
*/
public ReassignmentResult verifyReassignPartitions(Map<String, Map<Integer, List<Integer>>> reassignment) throws ApiException;
/**
* Initiates long-running preferred replica leader election procedure
* This is an asynchronous call, it returns immediately once server has accepted request, and created admin path in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that all partitions leader has moved to
* preferred replica.
* Note: currently there are only two possible states for preferred replica leader election: Completed, In Progress.
*
* @param partitions that need to moved leader to preferred replica
*
* @return java.util.concurrent.Future which is completed once all partitions have moved leader to preferred replica
*
* @throws ApiException in case preferred replica leader election wasn't initiated on server
*/
public Future<Void> preferredReplicaLeaderElection(Map<String, List<Integer> partitions) throws ApiException;
/**
* Checks the interim status of the preferred replica leader election.
* Preferred replica leader election for concrete partition is considered completed if all cluster brokers have received and cached
* relevant metadata for the given partition
*
* @param partitions same partitions as for preferred replica leader election request
*
* @return two maps - completed and partitions for which procedure is still in progress
* @throws ApiException in case preferred replica election verification wasn't initiated on server
*/
public PreferredReplicaLeaderElectionResult verifyPreferredReplicaLeaderElection(Map<String, List<Integer> partitions)
throws ApiException;

4. Interactive Shell / CLI tool

This component wraps AdminClient and provide an interactive shell-like environment for executing administrative commands. The goal of these changes is let people use existing standalone tools but from a single script, optionally running commands in shell, so commands arguments/names are not changed comparing to existing tools, where possible.

The tool supports two modes:

  • command line interface

  • interactive shell mode

Command Line Interface

This mode lets user run commands from shell script. List of available commands:

(Note: not all possible options are listed - e.g. alter topic's config)

# Topic Commands - options are ported from TopicCommand.scala
bin/kafka.sh --create-topic --topic my_topic --partitions 5 --replication-factor 3 --config key=value --broker-list <host : port>
bin/kafka.sh --alter-topic --topic my_topic --partitions 10 --broker-list <host : port>
bin/kafka.sh --delete-topic --topic my_topic --broker-list <host : port>
bin/kafka.sh --list-topics --broker-list <host : port>
bin/kafka.sh --describe-topic --topic my_topic --broker-list <host : port>

# Reassign Partitions - options are ported from ReassignPartitionsCommand.scala
bin/kafka.sh --reassign-partitions --reassignment-json-file /user/file.json --blocking --broker-list <host : port>

 
# Preferred Replica Leader Elections - options are ported from PreferredReplicaLeaderElectionCommand.scala
bin/kafka.sh --preferred-replica-leader-election --preferred-replica-leader-election /user/file.json --blocking --broker-list <host : port>

# Start kafka.sh in shell mode
bin/kafka.sh --shell --broker-list <host : port>

User will have to supply all commands with --broker-list <host : port> to define at least one broker from the target cluster.

Interactive Shell Mode

Interactive shell mode provides extended facilities for admin commands execution. Command names and options are the same but user will have to define --broker-list only once - CLI tool in interactive mode will manage cluster metadata cache and send commands to proper broker.

Also this mode provides facilities to switch context, so e.g. all topic commands are applied to switched topic - no need to specify topic-name for each topic command.

Proposed use-case is the following:

# Start kafka.sh in shell mode
bin/kafka.sh --shell --broker-list <host1 : port1>
Connected to Kafka Controller at <host2 : port2>.

kafka> create-topic --topic my_topic --partitions 5 --replication-factor 3
Topic "my_topic" is created.

kafka> alter-topic --topic my_topic --partitions 10
Topic "my_topic" is changed.
# Switch topic context
kafka> topic my_topic
Switched to "my_topic" topic.

# Execute topic command for switched topic
kafka my_topic> describe-topic
"my-topic" details:
Topic: my_topic Partitions: 10 ...

# Switch off topic context
kafka my_topic> topic
kafka>

Compatibility, Deprecation, and Migration Plan

The current tools will still be available in the 0.8.3 release but will be removed for 0.9 (tracked here https://issues.apache.org/jira/browse/KAFKA-1776 ).

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels