You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

The goals behind the command line shell are fundamentally to provide a centralized management for Kafka operations.


Current stateUnder Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Folks have created dozens of different systems to work with Kafka. If we can provide a wire protocol that allows the brokers to execute administrative code then clients can more simply execute commands. We have a lot of ties with zookeeper making it near impossible for some code to-do nothing better/more than shell script wrapping etc. With the wire protocol we should be able to have client in any language work with administrating Kafka. If someone wants a REST interface (for example) then can write that in whatever language they like. We should have a client from the project that is not only an example but a fully functionality replacement for the kafka-topics, reassign-partitions, consumer offset tools.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Command line tools and arguments

Proposed Changes


This is an instruction how to build and start Kafka Command Line Tool (hereinafter - Shell). The implementation is in progress under KAFKA-1694.

To start Shell you need to have a running Kafka Cluster built from the given patch (attached under KAFKA-1694) and build the Shell itself.

  1. Get the code.
    Get the KAFKA-1772_1802_1775_1774_v2.patch attached to KAFKA-1694.
    The patch was built against trunk, on top of revision 7e9368b . So reset to this commit and then run to apply the patch:

    git am KAFKA-1772_1802_1775_1774_v2.patch

  2. Build the code. Run:

    ./gradlew releaseTarGz_2_10_4

  3. Start somewhere Kafka Cluster from archive under ./core/build/distributions/kafka_2.10-0.8.3-SNAPSHOT.tgz

  4. Unpack build archive and start Shell:
    #cd <kafka_home>/core/build/distributions/ && rm -rf kafka_2.10-0.8.3-SNAPSHOT && tar -xf kafka_2.10-0.8.3-SNAPSHOT.tgz

  5. Start the Shell:
    sudo <kafka_home>/core/build/distributions/kafka_2.10-0.8.3-SNAPSHOT/bin/ --shell --broker <host : port>
    Where <host : port> is location of one of the running brokers from the Cluster.

  6. To get Shell help run:
    sudo <kafka_home>/core/build/distributions/kafka_2.10-0.8.3-SNAPSHOT/bin/ --help

Sample usage

You can use Kafka Command Line Tool in two ways: 1) as a interactive shell 2) as a simple CLI.

E.g. to get list of topics you can:

1) Start Shell and run:

sudo bin/ --shell --broker <host : port>

kafka> list-topics


2) Run right from

sudo bin/ --list-topics --broker <host : port>

Proposed RQ/RP Format

For each type of Admin Request a separate type of Wire protocol message is created.

Currently there are 5 types of messages which support TopicCommand - CreateTopic(Request | Response)AlterTopicDeleteTopicDescribeTopic,ListTopics. And a special message type to identify cluster info - ClusterMetadata (read Kafka Admin Command Line Internals for details).

The same notation as in  A Guide To The Kafka Protocol is used here. The only difference - new Kafka Protocol metatype - MaybeOf ("?" in notation), when used means value is optional in message. To define value existence special control byte is prepended before each value (0 - field is absent, otherwise - read value normally).

Cluster Metadata Request


ClusterMetadataRequest =>


Cluster Metadata Response


ClusterMEtadataResponse => ErrorCode [Broker] ?(Controller)
  ErrorCode => int16
  Broker => NodeId Host Port
    NodeId => int32
    Host => string
    Port => int32
Controller => Broker

ClusteMetadataRequest is a request with no arguments.

ClusterMetadataResponse holds error code (0 in case of successful result), list of brokers in cluster and optionally broker serving a Controller's role (returning empty Controller most likely means either error during request processing or cluster being in some intermediate state).

Admin RQ/RP format

All admin messages listed below are required to be sent only to Controller broker. Only controller will process such messages. If Admin message is sent to an ordinary broker a special error code is returned (code 22). In case of other failure during processing message AdminRequestFailedError is returned.






Unexpected error occurred while processing Admin request.

22Target broker (id=<this_broker_id>) is not serving a controller's role.


Create Topic Request


CreateTopicRequest => TopicName ?(Partitions) ?(Replicas) ?(ReplicaAssignment) [Config]
  TopicName => string
  Partitions => int32
  Replicas => int32
  ReplicaAssignment => string
  Config => string 


Create Topic Response


CreateTopicResponse => ErrorCode ?(ErrorDescription)
  ErrorCode => int16
  ErrorDescription => string

CreateTopicRequest requires topic name and either (partitions+replicas) or replicas assignment to create topic (validation is done on server side). You can also specify topic-level configs to create topic with (to use default set an empty array), format key=value.

CreateTopicResponse is fairly simple - you receive error code (0 as always identifies NO_ERROR) and optionally error description. Usually it will hold the higher level exception that happened during command execution.

Alter Topic Request


AlterTopicRequest => TopicName ?(Partitions) ?(ReplicaAssignment) [AddedConfig] [DeletedConfig]
  TopicName => string
  Partitions => int32
  Replicas => int32
  AddedConfig => string
  DeletedConfig => string


Alter Topic Response


AlterTopicResponse => ErrorCode ?(ErrorDescription)
  ErrorCode => int16
  ErrorDescription => string

AlterTopicRequest is similar to previous, to specify topic level settings that should be removed, use DeletedConfig array (just setting keys).

AlterTopicResponse is similar to CreateTopicResponse.

Delete Topic Request


DeleteTopicRequest => TopicName
  TopicName => string


Delete Topic Response


DeleteTopicResponse => ErrorCode ?(ErrorDescription)
  ErrorCode => int16
  ErrorDescription => string

DeleteTopicRequest requires only topic name which should be deleted.

DeleteTopicResponse is similar to CreateTopicResponse.

Describe Topic Request


DescribeTopicRequest =>
  TopicName => string


Describe Topic Response


DescribeTopicResponse => ErrorCode ?(ErrorDescription) ?(TopicDescription)
  ErrorCode => int16
  ErrorDescription => string
  TopicDescription => TopicName TopicConfigDetails [TopicPartitionDetails]
    TopicName => string
    TopicConfigDetails => Partitions ReplicationFactor [Config]
      Partitions => int32
      ReplicationFactor => int32
      Config => overridden topic-level configs
    TopicPartitionsDetails => PartitionId ?(Leader) [Replica] [ISR]
      PartitionId => int32
      Leader => int32
      Replica => int32
      ISR => int32
DescribeTopicRequest requires only topic name.

DescribeTopicResponse besides errorCode and optional errorDescription which are used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription structure. Its structure is the following:




The name of the topic for which description is provided.


A structure that holds basic replication details.


Number of partitions in give topic.


Topic-level setting and value which was overridden.


List describing replication details for each partition.


Id of the partition.

LeaderOptional broekr-leader id for the described partition.
ReplicasList of broker ids serving a replica's role for the partition.
ISRSame as replicas but includes only brokers that are known to be "in-sync"


List Topics Request


ListTopicsRequest =>


List Topics Response


ListTopicsResponse => ErrorCode ?(ErrorDescription) ?(TopicsList)
  ErrorCode => int16
  ErrorDescription => string
  TopicsList => [TopicMarkedForDeletion] [AliveTopic]
    TopicMarkedForDeletion => string
    AliveTopic => string
ListTopicsRequest is a request with no arguments.

ListTopicsResponse besides errorCode and optional errorDescription which are used in the same way as in previous messages, holds optional (non empty if execution was successful) two list of topic names - one for deleted topics (marked for deletion) and the second one for ordinary, alive topics.


Compatibility, Deprecation, and Migration Plan

  • When will we remove the existing behavior?

I don't know if that has to be decided now. Folks have already built wrapper tools, they can still keep using them if they want. We should code freeze them though.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels