You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

The goals behind the command line shell are fundamentally to provide a centralized management for Kafka operations.

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Folks have created dozens of different systems to work with Kafka. If we can provide a wire protocol that allows the brokers to execute administrative code then clients can more simply execute commands. We have a lot of ties with zookeeper making it near impossible for some code to-do nothing better/more than shell script wrapping kafka-topics.sh etc. With the wire protocol we should be able to have client in any language work with administrating Kafka. If someone wants a REST interface (for example) then can write that in whatever language they like. We should have a client from the project that is not only an example but a fully functionality replacement for the kafka-topics, reassign-partitions, consumer offset tools.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Command line tools and arguments

Proposed Changes

Installation

This is an instruction how to build and start Kafka Command Line Tool (hereinafter - Shell). The implementation is in progress under KAFKA-1694.

To start Shell you need to have a running Kafka Cluster built from the given patch (attached under KAFKA-1694) and build the Shell itself.

  1. Get the code.
    Get the KAFKA-1772_1802_1775_1774_v2.patch attached to KAFKA-1694.
    The patch was built against trunk, on top of revision 7e9368b . So reset to this commit and then run to apply the patch:

    git am KAFKA-1772_1802_1775_1774_v2.patch


  2. Build the code. Run:

    ./gradlew releaseTarGz_2_10_4


  3. Start somewhere Kafka Cluster from archive under ./core/build/distributions/kafka_2.10-0.8.3-SNAPSHOT.tgz

  4. Unpack build archive and start Shell:
    #cd <kafka_home>/core/build/distributions/ && rm -rf kafka_2.10-0.8.3-SNAPSHOT && tar -xf kafka_2.10-0.8.3-SNAPSHOT.tgz

  5. Start the Shell:
    sudo <kafka_home>/core/build/distributions/kafka_2.10-0.8.3-SNAPSHOT/bin/kafka.sh --shell --broker <host : port>
    Where <host : port> is location of one of the running brokers from the Cluster.

  6. To get Shell help run:
    sudo <kafka_home>/core/build/distributions/kafka_2.10-0.8.3-SNAPSHOT/bin/kafka.sh --help

Sample usage

You can use Kafka Command Line Tool in two ways: 1) as a interactive shell 2) as a simple CLI.

E.g. to get list of topics you can:

1) Start Shell and run:

sudo bin/kafka.sh --shell --broker <host : port>

kafka> list-topics

Or

2) Run right from kafka.sh:

sudo bin/kafka.sh --list-topics --broker <host : port>


Proposed RQ/RP Format

For each type of Admin Request a separate type of Wire protocol message is created.

Currently there are 5 types of messages which support TopicCommand - CreateTopic(Request | Response)AlterTopicDeleteTopicDescribeTopic,ListTopics. And a special message type to identify cluster info - ClusterMetadata (read Kafka Admin Command Line Internals for details).

The same notation as in  A Guide To The Kafka Protocol is used here. The only difference - new Kafka Protocol metatype - MaybeOf ("?" in notation), when used means value is optional in message. To define value existence special control byte is prepended before each value (0 - field is absent, otherwise - read value normally).

Cluster Metadata Request

 

ClusterMetadataRequest =>

 

Cluster Metadata Response

 

ClusterMEtadataResponse => ErrorCode [Broker] ?(Controller)
  ErrorCode => int16
  Broker => NodeId Host Port
    NodeId => int32
    Host => string
    Port => int32
Controller => Broker

ClusteMetadataRequest is a request with no arguments.

ClusterMetadataResponse holds error code (0 in case of successful result), list of brokers in cluster and optionally broker serving a Controller's role (returning empty Controller most likely means either error during request processing or cluster being in some intermediate state).

Admin RQ/RP format

All admin messages listed below are required to be sent only to Controller broker. Only controller will process such messages. If Admin message is sent to an ordinary broker a special error code is returned (code 22). In case of other failure during processing message AdminRequestFailedError is returned.

Error

Code

Description

AdminRequestFailed

21

Unexpected error occurred while processing Admin request.

InvalidRequestTarget
22Target broker (id=<this_broker_id>) is not serving a controller's role.

 

Create Topic Request

 

CreateTopicRequest => TopicName ?(Partitions) ?(Replicas) ?(ReplicaAssignment) [Config]
  TopicName => string
  Partitions => int32
  Replicas => int32
  ReplicaAssignment => string
  Config => string 

 

Create Topic Response

 

CreateTopicResponse => ErrorCode ?(ErrorDescription)
  ErrorCode => int16
  ErrorDescription => string

CreateTopicRequest requires topic name and either (partitions+replicas) or replicas assignment to create topic (validation is done on server side). You can also specify topic-level configs to create topic with (to use default set an empty array), format key=value.

CreateTopicResponse is fairly simple - you receive error code (0 as always identifies NO_ERROR) and optionally error description. Usually it will hold the higher level exception that happened during command execution.

Alter Topic Request

 

AlterTopicRequest => TopicName ?(Partitions) ?(ReplicaAssignment) [AddedConfig] [DeletedConfig]
  TopicName => string
  Partitions => int32
  Replicas => int32
  AddedConfig => string
  DeletedConfig => string

 

Alter Topic Response

 

AlterTopicResponse => ErrorCode ?(ErrorDescription)
  ErrorCode => int16
  ErrorDescription => string

AlterTopicRequest is similar to previous, to specify topic level settings that should be removed, use DeletedConfig array (just setting keys).

AlterTopicResponse is similar to CreateTopicResponse.

Delete Topic Request

 

DeleteTopicRequest => TopicName
  TopicName => string

 

Delete Topic Response

 

DeleteTopicResponse => ErrorCode ?(ErrorDescription)
  ErrorCode => int16
  ErrorDescription => string

DeleteTopicRequest requires only topic name which should be deleted.

DeleteTopicResponse is similar to CreateTopicResponse.

Describe Topic Request

 

DescribeTopicRequest =>
TopicName
  TopicName => string

 

Describe Topic Response

 

DescribeTopicResponse => ErrorCode ?(ErrorDescription) ?(TopicDescription)
  ErrorCode => int16
  ErrorDescription => string
  TopicDescription => TopicName TopicConfigDetails [TopicPartitionDetails]
    TopicName => string
    TopicConfigDetails => Partitions ReplicationFactor [Config]
      Partitions => int32
      ReplicationFactor => int32
      Config => overridden topic-level configs
    TopicPartitionsDetails => PartitionId ?(Leader) [Replica] [ISR]
      PartitionId => int32
      Leader => int32
      Replica => int32
      ISR => int32
DescribeTopicRequest requires only topic name.

DescribeTopicResponse besides errorCode and optional errorDescription which are used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription structure. Its structure is the following:

Field

Description

TopicName

The name of the topic for which description is provided.

TopicConfigDetails

A structure that holds basic replication details.

Partitions

Number of partitions in give topic.

Config

Topic-level setting and value which was overridden.

TopicPartitionDetails

List describing replication details for each partition.

PartitionId

Id of the partition.

LeaderOptional broekr-leader id for the described partition.
ReplicasList of broker ids serving a replica's role for the partition.
ISRSame as replicas but includes only brokers that are known to be "in-sync"

 

List Topics Request

 

ListTopicsRequest =>

 

List Topics Response

 

ListTopicsResponse => ErrorCode ?(ErrorDescription) ?(TopicsList)
  ErrorCode => int16
  ErrorDescription => string
  TopicsList => [TopicMarkedForDeletion] [AliveTopic]
    TopicMarkedForDeletion => string
    AliveTopic => string
ListTopicsRequest is a request with no arguments.

ListTopicsResponse besides errorCode and optional errorDescription which are used in the same way as in previous messages, holds optional (non empty if execution was successful) two list of topic names - one for deleted topics (marked for deletion) and the second one for ordinary, alive topics.

 

Compatibility, Deprecation, and Migration Plan

  • When will we remove the existing behavior?

I don't know if that has to be decided now. Folks have already built wrapper tools, they can still keep using them if they want. We should code freeze them though.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels