Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

1. Wire Protocol Extensions

Overview

It is proposed to add / modify these 4 types of requests:

  • Topic commands which include CreateTopic(Request | Response)AlterTopicDeleteTopic

Please find details under specific RQ/RP schema proposal.

Schema

Overall the idea is to extend Wire Protocol to cover all existing topic commands - create-topic, alter-topic, delete-topic, describe-topic, list-topics, reassign-partitionsadmin commands so that a user does not need to talk directly to Zookeeper and all commands can be authenticated via Kafka. At the same time, since the Wire Protocol is a public API to the Kafka cluster, it was agreed that the new Admin schema needs to be "orthogonal", i.e. new messages shouldn't duplicate each other or existing requests, if those already cover particular use cases.It is also important that all Admin

Finally, admin requests are

intended

likely to be

asynchronous. It means requests will only initiate particular command, and will not wait until the command is actually completed. There are different reasons to that, but we wan't to make sure that we give users all needed tools to verify whether issued command has been completed. E.g. one can consider topic is created once it is possible to consumer from / produce to this topic. In this case user can leverage TopicMetadataRequest to check all brokers received metadata about the newly created topic.

Finally, Topic Admin schema requests are likely to be used not only in CLI tool, where the common use case is create/change/delete a single entity. Since Kafka is able to maintain a huge number of topics it is vital user can efficiently request many commands at one time. That's why all Topic Admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network calls. At the same time to make Schema usage transparent and compliant with existing requests (such as Produce and Fetch) if batch request includes more than one instruction for a specific topic only the last from the list will be executed, others will be silently ignored.

 

used not only in CLI tools, where the common use case is create/change/delete a single entity. Since Kafka is able to maintain a huge number of topics it is vital user can efficiently request many commands at one time. That's why all admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network calls. At the same time to make Schema usage transparent and compliant with existing requests (such as Produce and Fetch) if batch request includes more than one instruction for a specific topic only the last from the list will be executed, others will be silently ignored.

 

New Protocol Errors

It is proposed to use existing / add these error codes to the protocol.

Error

Description

Requests
TopicAlreadyExistsTopic with this name already exists.Create
InvalidTopic (existing)Topic name contains invalid characters or doesn't exist.Create, Alter, Delete
InvalidPartitionsPartitions field is invalid (e.g. negative or increasing number of partitions in existing topic)Create, Alter
InvalidReplicationFactorReplicationFactor field is invalid (e.g. negative)Create, Alter
InvalidReplicaAssignmentReplicaAssignment

New Protocol Errors

It is proposed to use existing / add these error codes to the protocol.

Partitions negative or increasing number of partitions in existing topic

Error

Description

Requests
TopicAlreadyExistsTopic with this name already exists.Create
InvalidTopic (existing)Topic name contains invalid characters or doesn't exist.Create, Alter, Delete
InvalidPartitions field is invalid (e.g. contains duplicates)

Create, Alter

InvalidReplicationFactorReplicationFactor field is invalid (e.g. negative)Create, Alter
InvalidReplicaAssignmentReplicaAssignment field is invalid (e.g. contains duplicates)

Create, Alter

InvalidTopicConfiguration

Either topic

InvalidTopicConfiguration

Either topic-level config setting or value is incorrect.

Create
DecreasePartitionsNotAllowedInvalid Partitions argument: decreasing partitions is prohibited when altering topic.Alter
ReassignPartitionsInProgressReassign partitions procedure has been already started.Alter

Generally, the Admin Client (see section 3) or another request dispatcher a client should have enough context to provide descriptive error message.

The same notation as in  A Guide To The Kafka Protocol is used here. 

...

Expand
titleAdminClient API

public class AdminClient {

/**
* A client is instantiated by providing a set of key-value pairs as configuration. Most
* of the settings will be related to NetworkClient
*
* @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
*/
public AdminClient(Properties properties);

/**
* Initiates topics creation.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata about newly created topics was propagated
* to all brokers
*
* @param createTopicRequestBody holder (built by means of respective Builder) of all required arguments to create topics
* @return java.util.concurrent.Future which holds topics creation result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> createTopics(CreateTopicRequestBody createTopicRequestBody) throws ApiException;

/**
* Initiates topics alteration.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored/changed respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that updated metadata about altered topics was propagated
* to all brokers
*
* @param alterTopicRequestBody holder (built by means of respective Builder) of all required arguments to alter topics
* @return java.util.concurrent.Future which holds topics alteration result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> alterTopics(AlterTopicRequestBody alterTopicRequestBody) throws ApiException;

/**
* Initiates topic deletion.
* This is an asynchronous call, it returns immediately once server has accepted request and marked requested topics for deletion in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata with updated topic list was propagated to
* all brokers
*
* @param topics topic names to be deleted
* @return java.util.concurrent.Future which holds topics deletion result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic deletion was not even started
*/
public Future<Map<String, Errors>> deleteTopics(List<String> topics) throws ApiException;

/**
* Lists all available topics in Kafka cluster.
* Topic is considered available if all brokers in cluster have received and cached metadata about it
*
* @return list of topic names
*
* @throws ApiException
*/
public List<String> listTopics() throws ApiException;

/**
* TODO: not finalized yet
* Request replication information about Kafka topics
*
* @return a mapping between topic name and topic description
* @throws ApiException in case of global error, which means topic description cannot be fetched for all topics
*/

public Map<String, DescribeTopicOutput> describeTopics(List<String> topicNames) throws ApiException;

/**
* Initiates config alteration. This is an asynchronous call, it returns immediately once the server has accepted request and stored/changed respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that updated configs were persisted
*

* @param entityType Type of entity being described (topic, client etc..)
* @param alterConfigRequest holder (built by means of respective Builder) of all required arguments to alter topics
* @return java.util.concurrent.Future which holds topics alteration result - a map topic-name - complete after image of the entity configs
*
* @throws ApiException in case of the configs could be altered for all topics
*/
public Future<Map<String, EntityConfig>> alterConfig(String entityType, AlterConfigRequest) throws ApiException;

/**

* Describes config for any entity

* @param entityType Type of entity being described (topic, client etc..)

* @param entityNames Array of entity names to describe (topic names, client id's etc)

* @return a mapping between entity name and it's config. If config cannot be fetched for a particular entity, an error value of EntityConfig is returned
* @throws ApiException in case config cannot be fetched for all topics
*/
public Map<String, EntityConfig> describeConfig(String entityType, List<String> entityNames) throws ApiException;


/**
* Initiates long-running reassign partitions procedure.
* This is an asynchronous call, it returns immediately once server has accepted request, and created admin path in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure all that all partitions reassignments have completed.
* Note: currently there are only two possible states for reassigned partition: Completed, In Progress.
*
* @param reassignmentData schema among which replicas partitions will be reassigned
*
* @return java.util.concurrent.Future which is completed once all partitions have been reassigned
*
* @throws ApiException in case partition reassignment wasn't initiated on server
*/
public Future<Void> reassignPartitions(PartitionReassignmentData reassignmentData) throws ApiException;/**
* Checks the interim status of the partitions reassignment.
* Reassignment for concrete partition is considered completed if partition has been removed from
* admin zookeeper path and all cluster brokers have received and cached relevant AR metadata for the
* given partition
*
* @param reassignmentData schema same as was used for reassign partitions request
*
* @return two maps - completed and partitions for which reassignment is still in progress
* @throws ApiException in case reassignment verification wasn't initiated on server
*/
public ReassignmentResult verifyReassignPartitions(PartitionReassignmentData reassignmentData) throws ApiException;

}

4. Interactive Shell / CLI tool

This component wraps AdminClient and provide an interactive shell-like environment for executing administrative commands. The goal of these changes is let people use existing standalone tools but from a single script, optionally running commands in shell, so commands arguments/names are not changed comparing to existing tools, where possible.

The tool supports two modes:

...

Command Line Interface

This mode lets user run commands from shell script. List of available commands:

(Note: not all possible options are listed - e.g. alter replication factor)

# Topic Commands - options are ported from TopicCommand.scala
bin/kafka.sh --create-topic --topic my_topic --partitions 5 --replication-factor 3 --config key=value --broker-list <host : port>
bin/kafka.sh --alter-topic --topic my_topic --partitions 10 --broker-list <host : port>
bin/kafka.sh --delete-topic --topic my_topic --broker-list <host : port>
bin/kafka.sh --list-topics --broker-list <host : port>
bin/kafka.sh --describe-topic --topic my_topic --broker-list <host : port>
# Reassign Partitions - options are ported from ReassignPartitionsCommand.scala
bin/kafka.sh --reassign-partitions --reassignment-json-file /user/file.json --blocking --broker-list <host : port>

...

# Preferred Replica Leader Elections - options are ported from PreferredReplicaLeaderElectionCommand.scala
bin/kafka.sh --preferred-replica-leader-election --preferred-replica-leader-election /user/file.json --blocking --broker-list <host : port>
# Start kafka.sh in shell mode

...

User will have to supply all commands with --broker-list <host : port> to define at least one broker from the target cluster.

Interactive Shell Mode

Interactive shell mode provides extended facilities for admin commands execution. Command names and options are the same but user will have to define --broker-list only once - CLI tool in interactive mode will manage cluster metadata cache and send commands to proper broker.

Also this mode provides facilities to switch context, so e.g. all topic commands are applied to switched topic - no need to specify topic-name for each topic command.

Proposed use-case is the following:

# Start kafka.sh in shell mode
bin/kafka.sh --shell --broker-list <host1 : port1>
Connected to Kafka Controller at <host2 : port2>.
kafka> create-topic --topic my_topic --partitions 5 --replication-factor 3
Topic "my_topic" is created.
kafka> alter-topic --topic my_topic --partitions 10
Topic "my_topic" is changed.
# Switch topic context
kafka> topic my_topic
Switched to "my_topic" topic.
# Execute topic command for switched topic
kafka my_topic> describe-topic
"my-topic" details:
Topic: my_topic Partitions: 10 ...

.util.concurrent.Future which is completed once all partitions have been reassigned
*
* @throws ApiException in case partition reassignment wasn't initiated on server
*/
public Future<Void> reassignPartitions(PartitionReassignmentData reassignmentData) throws ApiException;


/**
* Checks the interim status of the partitions reassignment.
* Reassignment for concrete partition is considered completed if partition has been removed from
* admin zookeeper path and all cluster brokers have received and cached relevant AR metadata for the
* given partition
*
* @param reassignmentData schema same as was used for reassign partitions request
*
* @return two maps - completed and partitions for which reassignment is still in progress
* @throws ApiException in case reassignment verification wasn't initiated on server
*/
public ReassignmentResult verifyReassignPartitions(PartitionReassignmentData reassignmentData) throws ApiException;

}

Compatibility, Deprecation, and Migration Plan

 

# Switch off topic context
kafka my_topic> topic
kafka>
Compatibility, Deprecation, and Migration Plan

The current tools will still be available in the 0.8.3 release but will be removed for 0.9 (tracked here https://issues.apache.org/jira/browse/KAFKA-1776 ).

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

...