You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 72 Next »

The goals behind the command line shell are fundamentally to provide a centralized management for Kafka operations.

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Folks have created dozens of different systems to work with Kafka. If we can provide a wire protocol that allows the brokers to execute administrative code then clients can more simply execute commands. We have a lot of ties with zookeeper making it near impossible for some code to-do nothing better/more than shell script wrapping kafka-topics.sh etc. With the wire protocol we should be able to have client in any language work with administrating Kafka. If someone wants a REST interface (for example) then can write that in whatever language they like. We should have a client from the project that is not only an example but a fully functionality replacement for the kafka-topics, reassign-partitions, consumer offset tools.

Public Interfaces

  • Changes to Wire Protocol:

 -   Extending protocol with these messages: CreateTopic(Request | Response), AlterTopic, DeleteTopic

         -   Evolving TopicMetadaRequest_V0 to the next version

  • New client: AdminClient - a Wire Protocol client for administrative operations
  • New CLI tool: Kafka Shell - a single unified command line interface for administrative operations

Proposed Changes

Proposed changes include 4 parts:

  1. Wire Protocol extensions - to add new Admin messages
  2. Server-side Admin commands handlers (TopicCommand-like)
  3. Admin Client - an out-of-box client for performing administrative commands
  4. Interactive Shell / CLI tool supporting administrative commands

(You can try out some of the changes from this KIP. Please follow this link to get the code and start the CLI tool).

1. Wire Protocol Extensions

Overview

It is proposed to add / modify these 4 types of requests:

  • Topic commands which include CreateTopic(Request | Response)AlterTopicDeleteTopic
  • Extend TopicMetadataRequest to include topic configuration and remove ISR field

Please find details under specific RQ/RP schema proposal.

Schema

Overall the idea is to extend Wire Protocol to cover all existing topic commands - create-topic, alter-topic, delete-topic, describe-topic, list-topics, reassign-partitions. At the same time, since Wire Protocol is a public API to Kafka cluster, it was agreed that the new Admin schema needs to be "orthogonal", i.e. new messages shouldn't duplicate each other or existing messages, if those already cover particular use cases.
It is also important that all Admin requests are intended to be asynchronous. It means requests will only initiate particular command, and will not wait until the command is actually completed. There are different reasons to that, but we wan't to make sure that we give users all needed tools to verify whether issued command has been completed. E.g. one can consider topic is created once it is possible to consumer from / produce to this topic. In this case user can leverage TopicMetadataRequest to check all brokers received metadata about the newly created topic.

Finally, Topic Admin schema requests are likely to be used not only in CLI tool, where the common use case is create/change/delete a single entity. Since Kafka is able to maintain a huge number of topics it is vital user can efficiently request many commands at one time. That's why all Topic Admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network calls. At the same time to make Schema usage transparent and compliant with existing requests (such as Produce and Fetch) if batch request includes more than one instruction for a specific topic only the last from the list will be executed, others will be silently ignored.

 

New Protocol Errors

It is proposed to add these error codes to the protocol.

Error

Description

Requests
TopicAlreadyExistsTopic with this name already exists.CreateTopicRequest
InvalidArgumentTopicNameTopic name contains invalid characters.CreateTopicRequest
InvalidArgumentPartitionsEither partition field is invalid (e.g. negative)CreateTopicRequest, AlterTopicRequest
InvalidArgumentReplicationFactorEither replication-factor field is invalid (e.g. negative)CreateTopicRequest,AlterTopicRequest
InvalidArgumentReplicaAssignmentEither replication assignment field is invalid (e.g. contains duplicates)

CreateTopicRequest, AlterTopicRequest

InvalidArgumentTopicConfig

Either topic-level config setting or value is incorrect.

CreateTopicRequest, AlterTopicRequest
DecreasePartitionsNotAllowedInvalid partitions argument: decreasing partitions is prohibited when altering topic.AlterTopicRequest
PreferredReplicaLeaderElectionInProgressPreferred replica leader election procedure has been already started.PreferredReplicaLeaderElectionRequest
ReassignPartitionsInProgressReassign partitions procedure has been already started.AlterTopicRequest
MultipleInstructionsForOneTopicOnly one mutation is allowed at once: e.g. change topic replication factor or change topic config.CreateTopic, AlterTopicRequest
MultipleTopicInstructionsInOneBatchMultiple topic instructions for the same topic in one batch requestCreateTopicRequest, AlterTopicRequest, DeleteTopicRequest

Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.

The same notation as in  A Guide To The Kafka Protocol is used here. 

Topic Admin Schema

Create Topic Request

 

CreateTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment [ConfigEntry]]
TopicName => string
Partitions => int32
Replicas => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
ConfigEntry => ConfigKey ConfigValue
 ConfigKey => string
 ConfigValue => string
CreateTopicRequest is a batch asynchronous request to initiate topic creation with either predefined or automatic replica assignment and optionally topic configuration.
Request semantics:
  1. Only one from (Partitions + Replicationfactor), ReplicaAssignment can be defined in one instruction. (Note: there is a special use case - automatic topic creation for TopicMetadataRequest, to trigger it user should set client_id=consumer and define only topic name). If both parameters are specified - ReplicaAssignment takes precedence.
  2. In case ReplicaAssignment is defined number of partitions and replicas will be calculated from the supplied ReplicaAssignment. In case of defined (Partitions + ReplicationFactor) replica assignment will be automatically generated by the server.
  3. Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Create Topic Response

 

CreateTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

CreateTopicResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Alter Topic Request

 

AlterTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment [AddedConfigEntry] [DeletedConfig]]
TopicName => string
Replicas => int32
Partitions => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
AddedConfigEntry => ConfigKey ConfigValue
 ConfigKey => string
 ConfigValue => string
 DeletedConfig => string
AlterTopicRequest is a batch asynchronous request to initiate topic alteration: replication parameters, replica assingment or add/remove topic level configuration.
Request semantics:

1. If ReplicaAssignment is defined

    ReplicationFactor and Partitions arguments are ignored in this case.

    For each partition in ReplicaAssignment:

    1.1 If such partition exists and assignment is different from the current replica assignment

        It's a "reassign partition" request - add it reassign-partitions json

    1.2 If such partition doesn't exist

        It's an "add partition" request - change topic metadata in zookeeper to trigger increase partition logic

2. Else if ReplicationFactor is defined

    2.1 If Partitions is defined    

        Regenerate replica assignment for all existing and newly added partitions, goto 1.

    2.2 If Partitions is not defined     

        Regenerate replica assignment only for existing partitions, goto 1.

3. Else if Partition is defined (ReplicaAssignment and ReplicationFactor are not defined):

    3.1 If Partitions is less than current number of partitions return error code IncreasePartitionsNotAllowed

    3.2 Otherwise, generate automatically replica assignment for newly added partitions, goto 1.

 

Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.

   

Alter Topic Response

 

AlterTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

AlterTopicResponse is similar to CreateTopicResponse.
Delete Topic Request

 

DeleteTopicRequest => [TopicName]
TopicName => string

 

DeleteTopicRequest requires only topic names which should be deleted.
One DeleteTopicRequest may include only one topic deletion command for the topic with the given name in one batch, otherwise MultipleTopicInstructionsInOneBatch is returned.
Delete Topic Response

 

DeleteTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

DeleteTopicResponse is similar to CreateTopicResponse.

Topic Metadata Request V1

 

TopicMetadataReqeust_V1 => [TopicName]
TopicName => string
TopicMetadataRequest_V1 is an evolved version of TopicMetadataRequest. This request is intended to support two admin operations - get topic metadata, and check whether some particular admin command (which are all asynchronous) has been completed.
TopicMetadataRequest_V1 requires only topic names. As with the first version, an empty topic name set results in returning information for all existing topics.
Also TopicMetadataRequest_V1 comparing to the previous version won't trigger topic creation automatically if the topic with the given name doesn't exist.
Topic Metadata Response V1

 

TopicMetadataResponse_V1 => [Broker][TopicMetadata]
Broker => NodeId Host Port  (any number of brokers may be returned)
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] [ConfigEntry]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader ReplicasLag Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
ReplicasLag => [int32 int32]
Isr => [int32]
ConfigEntry => string string

The new version of TopicMetadataResponse in addition to TopicMetadataResponse_V0 will include topic level configuration for each topic and replica fetch lag per partition - how far partition replica is behind from the leader replica.


Replication Commands Schema

Preferred Replica Leader Election Request

 

PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]]
Topic => string
PartitionId => int32
PreferredReplicaLeaderEleactionRequest initiates preferred replica leader election procedure. Similar to Topic Admin requests this request in intended to be non-blocking. The schema consist of one field - array of partitions for which preferred replica leader should be elected.

To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.

Preferred Replica Leader Election Response

 

PreferredReplicaLeaderElectionResponse => [Topic ErrorCode]
Topic => string
ErrorCode => int16

 

PreferredReplicaLeaderElectionResponse is similar to ReassignPartitionsResponse.

Status of the procedure may be checked with TopicMetadataRequest  - the head of replicas list field and leader broker should be the same.

2. Server-side Admin Request handlers

All incoming request will be handled by specific helper classes called from KafkaApis - TopicCommandHelper for topic admin commands, ReassignPartitionsCommandHelper and PreferredReplicaLeaderElectionCommandHelper.

All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands)  most likely the logic from those classes will be extracted and copied  to separate classes (as proposed - TopicCommandHelper etc).

3. Admin Client

This component is intended to be a Kafka out-of-box client implementation for Admin commands.

Admin client will use Kafka NetworkClient facility from /clients for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).

Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.

Proposed API:

public class AdminClient {

/**
* A client is instantiated by providing a set of key-value pairs as configuration. Most
* of the settings will be related to NetworkClient
*
* @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
*/
public AdminClient(Properties properties);

/**
* Initiates topics creation.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata about newly created topics was propagated
* to all brokers
*
* @param createTopicRequestBody holder (built by means of respective Builder) of all required arguments to create topics
* @return java.util.concurrent.Future which holds topics creation result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> createTopics(CreateTopicRequestBody createTopicRequestBody) throws ApiException;

/**
* Initiates topics alteration.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored/changed respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that updated metadata about altered topics was propagated
* to all brokers
*
* @param alterTopicRequestBody holder (built by means of respective Builder) of all required arguments to alter topics
* @return java.util.concurrent.Future which holds topics alteration result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> alterTopics(AlterTopicRequestBody alterTopicRequestBody) throws ApiException;

/**
* Initiates topic deletion.
* This is an asynchronous call, it returns immediately once server has accepted request and marked requested topics for deletion in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata with updated topic list was propagated to
* all brokers
*
* @param topics topic names to be deleted
* @return java.util.concurrent.Future which holds topics deletion result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic deletion was not even started
*/
public Future<Map<String, Errors>> deleteTopics(List<String> topics) throws ApiException;

/**
* Lists all available topics in Kafka cluster.
* Topic is considered available if all brokers in cluster have received and cached metadata about it
*
* @return list of topic names
*
* @throws ApiException
*/
public List<String> listTopics() throws ApiException;

/**
* TODO: not finalized yet
* Request replication information about Kafka topics
*
* @return a mapping between topic name and topic description
* @throws ApiException in case of global error, which means topic description cannot be fetched for all topics
*/
public Map<String, DescribeTopicOutput> describeTopics(List<String> topicNames) throws ApiException;

/**
* Initiates long-running reassign partitions procedure.
* This is an asynchronous call, it returns immediately once server has accepted request, and created admin path in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure all that all partitions reassignments have completed.
* Note: currently there are only two possible states for reassigned partition: Completed, In Progress.
*
* @param reassignmentData schema among which replicas partitions will be reassigned
*
* @return java.util.concurrent.Future which is completed once all partitions have been reassigned
*
* @throws ApiException in case partition reassignment wasn't initiated on server
*/
public Future<Void> reassignPartitions(PartitionReassignmentData reassignmentData) throws ApiException;


/**
* Checks the interim status of the partitions reassignment.
* Reassignment for concrete partition is considered completed if partition has been removed from
* admin zookeeper path and all cluster brokers have received and cached relevant AR metadata for the
* given partition
*
* @param reassignmentData schema same as was used for reassign partitions request
*
* @return two maps - completed and partitions for which reassignment is still in progress
* @throws ApiException in case reassignment verification wasn't initiated on server
*/
public ReassignmentResult verifyReassignPartitions(PartitionReassignmentData reassignmentData) throws ApiException;


/**
* Initiates long-running preferred replica leader election procedure
* This is an asynchronous call, it returns immediately once server has accepted request, and created admin path in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that all partitions leader has moved to
* preferred replica.
* Note: currently there are only two possible states for preferred replica leader election: Completed, In Progress.
*
* @param preferredReplicaElectionData that need to moved leader to preferred replica
*
* @return java.util.concurrent.Future which is completed once all partitions have moved leader to preferred replica
*
* @throws ApiException in case preferred replica leader election wasn't initiated on server
*/
public Future<Void> preferredReplicaLeaderElection(PreferredReplicaLeaderElectionData preferredReplicaElectionData) throws ApiException;


/**
* Checks the interim status of the preferred replica leader election.
* Preferred replica leader election for concrete partition is considered completed if all cluster brokers have received and cached
* relevant metadata for the given partition
*
* @param preferredReplicaElectionData same partitions as for preferred replica leader election request
*
* @return two maps - completed and partitions for which procedure is still in progress
* @throws ApiException in case preferred replica election verification wasn't initiated on server
*/
public PreferredReplicaLeaderElectionResult verifyPreferredReplicaLeaderElection(PreferredReplicaLeaderElectionData preferredReplicaElectionData)
throws ApiException;

}

4. Interactive Shell / CLI tool

This component wraps AdminClient and provide an interactive shell-like environment for executing administrative commands. The goal of these changes is let people use existing standalone tools but from a single script, optionally running commands in shell, so commands arguments/names are not changed comparing to existing tools, where possible.

The tool supports two modes:

  • command line interface

  • interactive shell mode

Command Line Interface

This mode lets user run commands from shell script. List of available commands:

(Note: not all possible options are listed - e.g. alter topic's config)

# Topic Commands - options are ported from TopicCommand.scala
bin/kafka.sh --create-topic --topic my_topic --partitions 5 --replication-factor 3 --config key=value --broker-list <host : port>
bin/kafka.sh --alter-topic --topic my_topic --partitions 10 --broker-list <host : port>
bin/kafka.sh --delete-topic --topic my_topic --broker-list <host : port>
bin/kafka.sh --list-topics --broker-list <host : port>
bin/kafka.sh --describe-topic --topic my_topic --broker-list <host : port>

# Reassign Partitions - options are ported from ReassignPartitionsCommand.scala
bin/kafka.sh --reassign-partitions --reassignment-json-file /user/file.json --blocking --broker-list <host : port>

 
# Preferred Replica Leader Elections - options are ported from PreferredReplicaLeaderElectionCommand.scala
bin/kafka.sh --preferred-replica-leader-election --preferred-replica-leader-election /user/file.json --blocking --broker-list <host : port>

# Start kafka.sh in shell mode
bin/kafka.sh --shell --broker-list <host : port>

User will have to supply all commands with --broker-list <host : port> to define at least one broker from the target cluster.

Interactive Shell Mode

Interactive shell mode provides extended facilities for admin commands execution. Command names and options are the same but user will have to define --broker-list only once - CLI tool in interactive mode will manage cluster metadata cache and send commands to proper broker.

Also this mode provides facilities to switch context, so e.g. all topic commands are applied to switched topic - no need to specify topic-name for each topic command.

Proposed use-case is the following:

# Start kafka.sh in shell mode
bin/kafka.sh --shell --broker-list <host1 : port1>
Connected to Kafka Controller at <host2 : port2>.

kafka> create-topic --topic my_topic --partitions 5 --replication-factor 3
Topic "my_topic" is created.

kafka> alter-topic --topic my_topic --partitions 10
Topic "my_topic" is changed.
# Switch topic context
kafka> topic my_topic
Switched to "my_topic" topic.

# Execute topic command for switched topic
kafka my_topic> describe-topic
"my-topic" details:
Topic: my_topic Partitions: 10 ...

# Switch off topic context
kafka my_topic> topic
kafka>

Compatibility, Deprecation, and Migration Plan

The current tools will still be available in the 0.8.3 release but will be removed for 0.9 (tracked here https://issues.apache.org/jira/browse/KAFKA-1776 ).

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels