You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 30 Next »

The goals behind the command line shell are fundamentally to provide a centralized management for Kafka operations.

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Folks have created dozens of different systems to work with Kafka. If we can provide a wire protocol that allows the brokers to execute administrative code then clients can more simply execute commands. We have a lot of ties with zookeeper making it near impossible for some code to-do nothing better/more than shell script wrapping kafka-topics.sh etc. With the wire protocol we should be able to have client in any language work with administrating Kafka. If someone wants a REST interface (for example) then can write that in whatever language they like. We should have a client from the project that is not only an example but a fully functionality replacement for the kafka-topics, reassign-partitions, consumer offset tools.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Command line tools and arguments

Proposed Changes

Proposed changes include 4 parts:

  1. Wire Protocol extensions - to add new Admin messages
  2. Server-side Admin commands handlers (TopicCommand-like)
  3. Admin Client - an out-of-box client for performing administrative commands
  4. Interactive Shell / CLI tool supporting administrative commands

Some open questions and items under discussion are marked with [x]. Please find Open Questions section for more details.

1. Wire Protocol Extensions

Overview

For each type of Admin Request a separate type of Wire protocol message is created.

It is proposed to add / modify these 3 types of requests:

  • Topic commands which include CreateTopic(Request | Response)AlterTopicDeleteTopicDescribeTopicListTopics.
  • Replication tools - ReassingPartition, VerifyReassingPartitions; PreferredReplicaLeaderElection
  • A special type of request to support Admin commands - enriched TopicMetadataRequest (to add controllerId)

Please find details under specific RQ/RP schema proposal.

Schema

The same notation as in  A Guide To The Kafka Protocol is used here. 

Protocol Errors

It is proposed to add these error codes to the protocol.

Error

Code

Description

Requests
NotControllerReceivedAdminRequest
1001Target broker is not serving a controller's role.For all Admin requests
TopicAlreadyExists1002Topic with this name already exists.CreateTopicRequest
InvalidArgumentPartitions1003Either partition field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest, AlterTopicRequest
DecreasePartitionsNotAllowed1004Invalid partitions argument: decrease partitions is prohibited.AlterTopicRequest
InvalidArgumentReplicationFactor1005Either replication-factor field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest
InvalidArgumentReplicaAssignment1006Either replication-factor field is invalid (e.g. contains duplicates), or not defined when needed.

CreateTopicRequest, AlterTopicRequest,

ReassignPartitionsRequest, VerifyReassignPartitionsRequest

InvalidTopicConfig1007

Either topic-level config setting or value is incorrect.

CreateTopicRequest, AlterTopicRequest
PreferredReplicaLeaderElectionInProgress1008Preferred replica leader election procedure has been already started.PreferredReplicaLeaderElectionRequest
InvalidArgumentPreferredReplicaElectionData1009Preferred replica leader election data is in invalid (bad json, duplicates etc).PreferredReplicaLeaderElectionRequest
ReassignPartitionsInProgress1010Reassign partitions procedure has been already started.ReassignPartitionsRequest

Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.

E.g. in case of receiving InvalidArgumentPartitions client will be able to define:

a) upon AlterTopicRequest: this happened because user provided incorrect partitions argument (e.g. negative)

b) upon CreateTopicRequest: this happened because user provided replication-factor but not provided partitions argument

ClusterMetadata Schema

Cluster Metadata Request

 

ClusterMetadataRequest =>

 

Cluster Metadata Response

 

ClusterMetadataResponse => ErrorCode [Broker] ?(Controller)
ErrorCode => int16
Broker => NodeId Host Port
NodeId => int32
Host => string
Port => int32
 Controller => Broker

ClusteMetadataRequest is a request with no arguments.

ClusterMetadataResponse holds error code (0 in case of successful result), list of brokers in cluster and optionally broker serving a Controller's role (returning empty Controller most likely means either error during request processing or cluster being in some intermediate state).

ClusterMetadataRequest is required for admin clients to get the Kafka brokers, specifically the controller's location, as only controller may execute admin command.

Topic Admin Schema

The idea is to introduce Wire protocol messages that cover all topic commands (create, alter, delete, list, describe). The motivation behind the proposed schema is the following:

1) Topic commands must inherit options from TopicCommand tool

2) If some of the options are not used in particular command (e.g. ReplicaAssignment in CreateTopicRequest) - the special marker value is used instead (e.g. in case of ReplicaAssignment - empty string)

3) Topic commands must support batching and provide command execution result per-topic

4) Topic commands can be executed only on a broker serving a controller's role - in case request is sent to an ordinary broker - a request-level error should reflect that

Create Topic Request

 

CreateTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [ConfigEntry]]
TopicName => string
Partitions => int32
Replicas => int32
ReplicaAssignment => string
ConfigEntry => ConfigKey ConfigValue
ConfigKey => string
ConfigValue => string

 

Create Topic Response

 

CreateTopicResponse => ErrorCode ?(ErrorDescription)
ErrorCode => int16
ErrorDescription => string

CreateTopicRequest requires topic name and either (partitions+replicas) or replicas assignment to create topic (validation is done on server side). You can also specify topic-level configs to create topic with (to use default set an empty array).

CreateTopicResponse is fairly simple - you receive error code (0 as always identifies NO_ERROR) and optionally error description. Usually it will hold the higher level exception that happened during command execution.

Alter Topic Request

 

AlterTopicRequest => TopicName ?(Partitions) ?(ReplicaAssignment) [AddedConfigEntry] [DeletedConfig]
TopicName => string
Partitions => int32
ReplicaAssignment => string
AddedConfigEntry => ConfigKey ConfigValue
 ConfigKey => string
 ConfigValue => string
 DeletedConfig => string

 

Alter Topic Response

 

AlterTopicResponse => ErrorCode ?(ErrorDescription)
ErrorCode => int16
ErrorDescription => string

AlterTopicRequest is similar to previous, to specify topic level settings that should be removed, use DeletedConfig array (just setting keys). User can provide new partitions value, replica assignment or both.

AlterTopicResponse is similar to CreateTopicResponse.

Delete Topic Request

 

DeleteTopicRequest => TopicName
TopicName => string

 

Delete Topic Response

 

DeleteTopicResponse => ErrorCode ?(ErrorDescription)
ErrorCode => int16
ErrorDescription => string

DeleteTopicRequest requires only topic name which should be deleted.

DeleteTopicResponse is similar to CreateTopicResponse.

Describe Topic Request

 

DescribeTopicRequest => TopicName
TopicName => string

 

Describe Topic Response

 

DescribeTopicResponse => ErrorCode ?(ErrorDescription) ?(TopicDescription)
ErrorCode => int16
ErrorDescription => string
TopicDescription => TopicName TopicConfigDetails [TopicPartitionDetails]
TopicName => string
TopicConfigDetails => Partitions ReplicationFactor [ConfigEntry]
Partitions => int32
ReplicationFactor => int32
ConfigEntry => string string
TopicPartitionsDetails => PartitionId ?(Leader) [Replica] [ISR]
PartitionId => int32
Leader => int32
Replica => int32
ISR => int32
DescribeTopicRequest requires only topic name.

DescribeTopicResponse besides errorCode and optional errorDescription which are used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription structure. See table below for details:

Field

Description

TopicName

The name of the topic for which description is provided.

TopicConfigDetails

A structure that holds basic replication details.

Partitions

Number of partitions in give topic.

Config

Topic-level setting and value which was overridden.

TopicPartitionDetails

List describing replication details for each partition.

PartitionId

Id of the partition.

LeaderOptional broekr-leader id for the described partition.
ReplicasList of broker ids serving a replica's role for the partition.
ISRSame as replicas but includes only brokers that are known to be "in-sync"

 

List Topics Request

 

ListTopicsRequest =>

 

List Topics Response

 

ListTopicsResponse => ErrorCode ?(ErrorDescription) ?(TopicsList)
ErrorCode => int16
ErrorDescription => string
TopicsList => [TopicName]
TopicName => string
ListTopicsRequest is a request with no arguments.

ListTopicsResponse besides errorCode and optional errorDescription which are used in the same way as in previous messages, holds a list of topics in Kafka cluster.

Replication Commands Schema

Reassign Partitions
Reassign Partitions Request

 

ReassignPartitionRequest => ManualAssignment
ManualAssignment => string

 

Reassign Partitions Response

 

ReassignPartitionResponse => ErrorCode ?(ErrorDescription)
ErrorCode => int16
ErrorDescription => string

ReassignPartitionsRequest requires manual partition assignment string. Parsing / validation is done on server. This request will only initiate partition reassignment and return immediately. It is client's responsibility to block the user continually sending VerifyReassignPartitionsRequest to check reassignment status. The format is the following:

{

"partitions": [

{"topic": "foo",
 "partition": 1,
 "replicas": [1,2,3] }

],
 "version":1
}

ReassignPartitionResponse is similar to CreateTopicResponse.


Verify Reassign Partitions Request

 

VerifyReassignPartitionRequest => ManualAssignment
ManualAssignment => string

 

Verify Reassign Partitions Response

 

VerifyReassignPartitionResponse => [ReasignmnetResult] ErrorCode ?(ErrorDescription)
ReasignmnetResult => TopicAndPartition ResultCode
TopicAndPartition => string int32
 ResultCode => int16
 ErrorCode => int16
ErrorDescription => string

VerifyReassignPartitionsRequest requires manual partition assignment string as with ReassignPartitionsRequest which status is verified by this request.

VerifyReassignPartitionResponse as with other Admin request may return error code and optional error description in case of failure. Otherwise a reassignment result map is returned. It holds reassignment status (-1 - reassignment failed, 0 - in progress, 1 - completed successfully).


Preferred Replica Leader Election
Preferred Replica Leader Election Request

 

PreferredReplicaLeaderElectionRequest => PartitionsSerialized
PartitionsSerialized => string

 

Preferred Replica Leader Election Response

 

PreferredReplicaLeaderElectionResponse => ErrorCode ?(ErrorDescription)
ErrorCode => int16
ErrorDescription => string

PreferredReplicaLeaderEleactionRequest initiates preferred replica leader election procedure, similar to ReassignPartitionsRequest this request in intended to be non-blocking. The schema consist of one optional field - partitions in serialized form (json) for which procedure should be started. The format is the following:

{"partitions":[

{"topic": "foo", "partition": 1},
{"topic": "foobar", "partition": 2}

]
}

PreferredReplicaLeaderElectionResponse is similar to CreateTopicResponse.

Status of the procedure may be checked with DescribeTopicRequest  - the head of replicas list field and leader broker should be the same.

2. Server-side Admin Request handlers

All incoming request will be handled by specific helper classes called from KafkaApis - TopicCommandHelper for topic admin commands, ReassignPartitionsCommandHelper and PreferredReplicaLeaderElectionCommandHelper.

All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands)  most likely the logic from those classes will be extracted and copied  to separate classes (as proposed - TopicCommandHelper[3] etc).

3. Admin Client

This component is intended to be a Kafka out-of-box client implementation for Admin commands.

Admin client will use Kafka NetworkClient facility from /clients for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).

Proposed API [4]:

public class AdminClient {
    
    /**
     * A client is instantiated by providing a set of key-value pairs as configuration. Most
	 * of the settings will be related to NetworkClient
     *
     * @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
     */
    public AdminClient(Properties properties) 
    
    /**
     * Create topic with given number of partitions and replication factor, replica assignment will be handled by Kafka cluster
     *
     * @throws ApiException
     */
    public void createTopic(String topicName, int partitions, int replicationFactor, List<ConfigEntry> configs) throws ApiException;
    
    /**
     * Create topic with specified replica assignment (number of partitions and replication factor will be taken
     * from replica assignment string)
     *
     * @throws ApiException
     */
    public void createTopic(String topicName, String replicaAssignment, List<ConfigEntry> configs) throws ApiException;
 
    /**
     * Alter existing topic partitions and/or replica assignment among Kafka brokers
     *
     * @throws ApiException
     */
    public void alterTopic(String topicName, Integer partitions, String replicaAssignment,
                                    List<ConfigEntry> addedConfigs, List<String> deletedConfigs) throws ApiException;
    /**
     * Delete Kafka topic by name
     *
     * @throws ApiException
     */
    public void deleteTopic(String topicName) throws ApiException;
    
    /**
     * List all existing topics in Kafka cluster
     *
     * @throws ApiException
     */
    public List<String> listTopics() throws ApiException;
    
    /**
     * Request replication information about Kafka topic
     *
     * @throws ApiException
     */
    public DescribeTopicOutput describeTopic(String topicName) throws ApiException;
    
    /**
     * Initiate long-running reassign partitions procedure
     *
     * @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand)
     * @return future of the reassignment result which is completed once server-side partitions reassignment has succeeded or
     * an error occurred so that partitions reassignment cannot be started
     * @throws ApiException
     */
    public Future<ReassignPartitionsResponse> reassignPartitions(String partitionsReassignment) throws ApiException;

    /**
     * Check the interim status of the partitions reassignment
     *
     * @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand)
     * @return partition to reassignment result code (completed, in-progress, failed)
     * @throws ApiException
     */
    public Map<TopicPartition, Short> verifyReassignPartitions(String partitionsReassignment) throws ApiException;
    
    /**
     * Initiate long-running preferred replica leader election procedure
     *
     * @param partitions serialized partitions for which preferred replica leader election will be started
     *                   (according to PreferredReplicaLeaderElectionCommand)
     * @return future of the election result which is completed once server-side preferred replica is elected for provided partitions or
     * an error has occurred
     * @throws ApiException
     */
    public Future<PreferredReplicaLeaderElectionResponse> preferredReplicaLeaderElection(String partitions) throws ApiException;

    /**
     * Check the interim status of the preferred replica leader election
     *
     * @param partitions for which preferred replica leader election was started (according to PreferredReplicaLeaderElectionCommand)
     * @return partition to reassignment result code (completed, in-progress, failed)
     * @throws ApiException
     */
    public VerifyPreferredReplicaLeaderElectionResponse verifyPreferredReplicaLeaderElection(String partitions)
            throws ApiException;
    
    /**
     * A generic facility to send Admin request and return response counterpart
     *
     * @param adminRequest AdminRequest message
     * @param <T>          concrete AdminRequest type
     * @return response counterpart
     * @throws ApiException
     */
    private <T extends AbstractAdminResponse> T sendAdminRequest(AbstractAdminRequest<T> adminRequest) throws ApiException;

 
    /**
     * Refreshes cluster metadata cache - list of brokers and controller
     * 
     * @throws ApiException
     */
    private void updateClusterMetadata() throws Exception;

}

4. Interactive Shell / CLI tool

This component wraps AdminClient and provide an interactive shell-like environment for executing administrative commands. The goal of these changes is let people use existing standalone tools but from a single script, optionally running commands in shell, so commands arguments/names are not changed comparing to existing tools, where possible.

The tool supports two modes:

  • command line interface

  • interactive shell mode

Command Line Interface

This mode lets user run commands from shell script. List of available commands:

(Note: not all possible options are listed - e.g. alter topic's config)

# Topic Commands - options are ported from TopicCommand.scala
bin/kafka.sh --create-topic --topic my_topic --partitions 5 --replication-factor 3 --config key=value --broker-list <host : port>
bin/kafka.sh --alter-topic --topic my_topic --partitions 10 --broker-list <host : port>
bin/kafka.sh --delete-topic --topic my_topic --broker-list <host : port>
bin/kafka.sh --list-topics --broker-list <host : port>
bin/kafka.sh --describe-topic --topic my_topic --broker-list <host : port>

# Reassign Partitions - options are ported from ReassignPartitionsCommand.scala
bin/kafka.sh --reassign-partitions --reassignment-json-file /user/file.json --blocking --broker-list <host : port>

 
# Preferred Replica Leader Elections - options are ported from PreferredReplicaLeaderElectionCommand.scala
bin/kafka.sh --preferred-replica-leader-election --preferred-replica-leader-election /user/file.json --blocking --broker-list <host : port>

# Start kafka.sh in shell mode
bin/kafka.sh --shell --broker-list <host : port>

User will have to supply all commands with --broker-list <host : port> to define at least one broker from the target cluster.

Interactive Shell Mode

Interactive shell mode provides extended facilities for admin commands execution. Command names and options are the same but user will have to define --broker-list only once - CLI tool in interactive mode will manage cluster metadata cache and send commands to proper broker.

Also this mode provides facilities to switch context, so e.g. all topic commands are applied to switched topic - no need to specify topic-name for each topic command.

Proposed use-case is the following:

# Start kafka.sh in shell mode
bin/kafka.sh --shell --broker-list <host1 : port1>
Connected to Kafka Controller at <host2 : port2>.

kafka> create-topic --topic my_topic --partitions 5 --replication-factor 3
Topic "my_topic" is created.

kafka> alter-topic --topic my_topic --partitions 10
Topic "my_topic" is changed.
# Switch topic context
kafka> topic my_topic
Switched to "my_topic" topic.

# Execute topic command for switched topic
kafka my_topic> describe-topic
"my-topic" details:
Topic: my_topic Partitions: 10 ...

# Switch off topic context
kafka my_topic> topic
kafka>

Open questions:

  1. ClusterMetadata duplicates TopicMetadata - we can extend TopicMetadata with controllerId information and probably something else. Other alternative - is a generic server-side re-routing facility (see KAFKA-1912 for details).
  2. We might extend error codes to fulfill all possible failures and give up using outcome / errorDescription field as a generic result description.
  3. It is proposed to create a separate ticket to rework topic command to execute commands directly by the controller instead of using zookeeper admin path to notify controller about the change.
  4. AdminClient may need to support batching admin operations. It is considered whether we can cover it with allowing user to supply a regexp for topic name in AlterTopic, DeleteTopic, DescribeTopic requests (similarly to TopicCommand.scala)

Compatibility, Deprecation, and Migration Plan

The current tools will still be available in the 0.8.3 release but will be removed for 0.9 (tracked here https://issues.apache.org/jira/browse/KAFKA-1776 ).

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels