Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Wire Protocol extensions - to add new Admin messages
  2. Server-side Admin commands handlers (TopicCommand-like)
  3. Admin Client - an out-of-box client for performing administrative commands
  4. Interactive Shell / CLI tool supporting administrative commands

 

Some open questions and items under discussion are marked with [x]. Please find Open Questions section for more details.

 

1. Wire Protocol Extensions

...

All admin messages listed below are required to be sent only to Controller broker. Only controller will process such messages. If Admin message is sent to an ordinary broker a special error code is returned (code 22). In case of other failure during processing message AdminRequestFailedError is returned [2].

Error

Code

Description

AdminRequestFailed

21

Unexpected error occurred while processing Admin request.

NotControllerForAdminRequest
22Target broker (id=<this_broker_id>) is not serving a controller's role.

ClusterMetadata

Schema

Schema [3]

Cluster Metadata Request

 

ClusterMetadataRequest =>

 

Cluster Metadata Response

 

ClusterMetadataResponse => ErrorCode [Broker] ?(Controller)
ErrorCode => int16
Broker => NodeId Host Port
NodeId => int32
Host => string
Port => int32
 Controller => Broker

ClusteMetadataRequest is a request with no arguments.

ClusterMetadataResponse holds error code (0 in case of successful result), list of brokers in cluster and optionally broker serving a Controller's role (returning empty Controller most likely means either error during request processing or cluster being in some intermediate state).

ClusterMetadataRequest is required for admin clients to get the Kafka brokers, specifically the controller's location, as only controller may execute admin command [2].

Topic Admin Schema

...

All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands)  most likely the logic from those classes will be extracted and copied  to separate classes (as proposed - TopicCommandHelper[4] etc).

3. Admin Client

This component is intended to be a Kafka out-of-box client implementation for Admin commands.

Admin client will use Kafka NetworkClient facility from /clients for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (as e.g. reassign partitions).

Proposed API [5]:

Expand
titleAdminClient API
public class AdminClient {
    
	    /**
     * A producer is instantiated by providing a set of key-value pairs as configuration. Most
	 * of the settings will be related to NetworkClient
     *
     * @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
     */
    public AdminClient(Properties properties) 
    
	 
    /**
     * Create topic with given number of partitions and replication factor, replica assignment will be handled by Kafka cluster
     *
     * @throws ApiException
     */
    public void createTopic(String topicName, int partitions, int replicationFactor, List<ConfigEntry> configs) throws ApiException;
    
	    /**
     * Create topic with specified replica assignment (number of partitions and replication factor will be taken
     * from replica assignment string)
     *
     * @throws ApiException
     */
    public void createTopic(String topicName, String replicaAssignment, List<ConfigEntry> configs) throws ApiException;

    /**
     * Alter existing topic partitions and/or replica assignment among Kafka brokers
     *
     * @throws ApiException
     */
    public void alterTopic(String topicName, Integer partitions, String replicaAssignment,
                                    List<ConfigEntry> addedConfigs, List<String> deletedConfigs) throws ApiException;
    /**
     * Delete Kafka topic by name
     *
     * @throws ApiException
     */
    public void deleteTopic(String topicName) throws ApiException;
    
	/**
     * List all existing topics in Kafka cluster
     *
     * @throws ApiException
     */
    public List<String> listTopics() throws ApiException;
    
	/**
     * Request replication information about Kafka topic
     *
     * @throws ApiException
     */
    public DescribeTopicOutput describeTopic(String topicName) throws ApiException;
    
	/**
     * Initiate long-running reassign partitions procedure
     *
     * @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand)
     * @return future of the reassignment result which is completed once server-side partitions reassignment has succeeded or
     * an error occurred so that partitions reassignment cannot be started
     * @throws ApiException
     */
    public Future<ReassignPartitionsResponse> reassignPartitions(String partitionsReassignment) throws ApiException;

    /**
     * Check the interim status of the partitions reassignment
     *
     * @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand)
     * @return partition to reassignment result code (completed, in-progress, failed)
     * @throws ApiException
     */
    public Map<TopicPartition, Short> verifyReassignPartitions(String partitionsReassignment) throws ApiException;
    
	/**
     * Initiate long-running preferred replica leader election procedure
     *
     * @param partitions serialized partitions for which preferred replica leader election will be started
     *                   (according to PreferredReplicaLeaderElectionCommand)
     * @return future of the election result which is completed once server-side preferred replica is elected for provided partitions or
     * an error has occurred
     * @throws ApiException
     */
    public Future<PreferredReplicaLeaderElectionResponse> preferredReplicaLeaderElection(String partitions) throws ApiException;

    /**
     * Check the interim status of the preferred replica leader election
     *
     * @param partitions for which preferred replica leader election was started (according to PreferredReplicaLeaderElectionCommand)
     * @return partition to reassignment result code (completed, in-progress, failed)
     * @throws ApiException
     */
    public VerifyPreferredReplicaLeaderElectionResponse verifyPreferredReplicaLeaderElection(String partitions)
            throws ApiException;
	/**
     * A generic facility to send Admin request and return response counterpart
     *
     * @param adminRequest AdminRequest message
     * @param <T>          concrete AdminRequest type
     * @return response counterpart
     * @throws ApiException
     */
    private <T extends AbstractAdminResponse> T sendAdminRequest(AbstractAdminRequest<T> adminRequest) throws ApiException;

 
	/**
     * Refreshes cluster metadata cache - list of brokers and controller
     * 
     * @throws ApiException
     */
    private void updateClusterMetadata() throws Exception;

}

4. Interactive Shell / CLI tool

This component will wrap wraps AdminClient and provide an interactive shell-like environment for executing administrative commands. The goal of these changes is let people use existing standalone tools but from a single script, optionally running commands in shell, so commands arguments/names are not changed comparing to existing tools, where possible.

The tool will be run in supports two modes:

  • command line interface

  • Shell-like mode

Installation

This is an instruction how to build and start Kafka Command Line Tool (hereinafter - Shell). The implementation is in progress under KAFKA-1694.

To start Shell you need to have a running Kafka Cluster built from the given patch (attached under KAFKA-1694) and build the Shell itself.

Get the code.
Get the KAFKA-1772_1802_1775_1774_v2.patch attached to KAFKA-1694.
The patch was built against trunk, on top of revision 7e9368b . So reset to this commit and then run to apply the patch:

git am KAFKA-1772_1802_1775_1774_v2.patch

Build the code. Run:

./gradlew releaseTarGz_2_10_4

...

Command Line Interface

This mode lets user run commands from shell script. List of available commands:

(Note: not all possible options are listed - e.g. alter topic's config)

# Topic Commands - options are ported from TopicCommand.scala
bin/kafka.sh --create-topic --topic my_topic --partitions 5 --replication-factor 3 --config key=value --broker-list <host : port>
bin/kafka.sh --alter-topic --topic my_topic --partitions 10 --broker-list <host : port>
bin/kafka.sh --delete-topic --topic my_topic --broker-list <host : port>
bin/kafka.sh --list-topics --broker-list <host : port>
bin/kafka.sh --describe-topic --topic my_topic --broker-list <host : port>

# Reassign Partitions - options are ported from ReassignPartitionsCommand.scala
bin/kafka.sh --reassign-partitions --reassignment-json-file /user/file.json --blocking --broker-list <host : port>

 
# Preferred Replica Leader Elections - options are ported from PreferredReplicaLeaderElectionCommand.scala
bin/kafka.sh --preferred-replica-leader-election --preferred-replica-leader-election /user/file.json --blocking --broker-list <host : port>

# Start kafka.sh in shell mode
bin/kafka.sh --shell --broker-list <host : port>

User will have to supply all commands with --broker-list <host : port> to define at least one broker from the target cluster.

Shell-like Mode

Shell-like mode provides extended facilities for admin commands execution. Command names and options are the same but user will have to define --broker-list only once - CLI tool in shell mode will manage cluster metadata cache and send commands to proper broker.

Also shell-like mode provides facilities to switch context, so e.g. all topic commands are applied to switched topic - no need to specify topic-name for each topic command.

Proposed use-case is the following:

# Start kafka.sh in shell mode
bin/kafka.sh --shell --broker-list <host1 : port1>
Connected to Kafka Controller at <host2 : port2>.
kafka> create-topic --topic my_topic --partitions 5 --replication-factor 3
Topic "my_topic" is created.
kafka> alter-topic --topic my_topic --partitions 10
Topic "my_topic" is changed.
# Switch topic context
kafka> topic my_topic
Switched to "my_topic" topic.
# Execute topic command for switched topic
kafka my_topic> describe-topic
"my-topic" details:
Topic: my_topic Partitions: 10 ...
# Switch off topic context
kafka my_topic> topic
kafka>

Open questions:

  1. People expressed concerns about optional MaybeOf type because it is inconsistent with the way we currently handle empty values in the Wire Protocol.
  2. We might extend error codes to fulfill all possible failures and give up using outcome / errorDescription field as a generic result description.
  3. ClusterMetadata duplicates TopicMetadata - we can extend TopicMetadata with controllerId information and probably something else. Other alternative - is a generic server-side re-routing facility (see KAFKA-1912 for details).
  4. It is proposed to create a separate ticket to rework topic command to execute commands directly by the controller instead of using zookeeper admin path to notify controller about the change.
  5. AdminClient may need to support batching admin operations. It is considered whether we can cover it with allowing user to supply a regexp for topic name in AlterTopic, DeleteTopic, DescribeTopic requests (similarly to TopicCommand.scala)

Sample usage

You can use Kafka Command Line Tool in two ways: 1) as a interactive shell 2) as a simple CLI.

E.g. to get list of topics you can:

1) Start Shell and run:

sudo bin/kafka.sh --shell --broker <host : port>

kafka> list-topics

Or

2) Run right from kafka.sh:

sudo bin/kafka.sh --list-topics --broker <host : port>

Open questions:

1

Compatibility, Deprecation, and Migration Plan

...