Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

New Protocol Errors

It is proposed to add these error codes to the protocol.

Error

Description

Requests
TopicAlreadyExistsTopic with this name already exists.CreateTopicRequest
InvalidArgumentTopicNameTopic name contains invalid charactersCreateTopicRequest
InvalidArgumentPartitionsEither partition field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest, AlterTopicRequest
InvalidArgumentReplicationFactorEither replication-factor field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest,AlterTopicRequest
InvalidArgumentReplicaAssignmentEither replication-factor field is invalid (e.g. contains duplicates), or not defined when needed.

CreateTopicRequest, AlterTopicRequest

InvalidArgumentTopicConfig

Either topic-level config setting or value is incorrect.

CreateTopicRequest, AlterTopicRequest
DecreasePartitionsNotAllowedInvalid partitions argument: decreasing partitions is prohibited when altering topic.AlterTopicRequest
PreferredReplicaLeaderElectionInProgressPreferred replica leader election procedure has been already started.PreferredReplicaLeaderElectionRequest
ReassignPartitionsInProgressReassign partitions procedure has been already started.AlterTopicRequest
MultipleInstructionsForOneTopicOnly one mutation is allowed at once: e.g. change topic replication factor or change topic configCreateTopic, AlterTopicRequest
MultipleTopicInstructionsInOnebatchMultipleTopicInstructionsInOneBatchMultiple topic instructions for the same topic in one batch requestCreateTopicRequest, AlterTopicRequest, DeleteTopicRequest

Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.

The same notation as in  A Guide To The Kafka Protocol is used here. 

...

 

CreateTopicRequest => [TopicName Partitions Replicas ReplicaAssignment]
TopicName => string
Partitions => int32
Replicas => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
Request semantics:
User can define only one from (Partitions + Replicas), ReplicaAssignment in one instruction. (Note: there is a special use case - automatic topic creation for TopicMetadataRequest, to trigger it user should set client_id=consumer and define only topic name). Otherwise, MultipleInstructionsForOneTopic is returned.
In case ReplicaAssignment defined number of partitions and replicas will be extracted from the supplied ReplicaAssignment. In case of defined (Partitions + Replicas) replica assignment will be automatically generated by the server.
One CreateTopicRequest may include only one topic creation command for the topic with the given name in one batch, otherwise MultipleTopicInstructionsInOneBatch is returned.
Create Topic Response

 

CreateTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

CreateTopicResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Alter Topic Request

 

AlterTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [AddedConfigEntry] [DeletedConfig]]
TopicName => string
Replicas => int32
Partitions => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
AddedConfigEntry => ConfigKey ConfigValue
 ConfigKey => string
 ConfigValue => string
 DeletedConfig => string
AlterTopicRequest will be used to change topic's replication characteristics, alter topic-level config or reassign partitions.
Request semantics:
 

User can define only one from PartitionsReplicasReplicaAssignmentAddedConfigEntryDeletedConfig. Otherwise, MultipleInstructionsForOneTopic is returned.

One AlterTopicRequest may include only one topic alteration command for the topic with the given name in one batch, otherwise MultipleTopicInstructionsInOneBatch is returned.

Alter Alter Topic Response

 

AlterTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

AlterTopicResponse is similar to CreateTopicResponse.
Delete Topic Request

 

DeleteTopicRequest => [TopicName]
TopicName => string

 

DeleteTopicRequest requires only topic names which should be deleted.
Delete Topic Response
One DeleteTopicRequest may include only one topic deletion command for the topic with the given name in one batch, otherwise MultipleTopicInstructionsInOneBatch is returned.
Delete Topic Response

 

 

DeleteTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

DeleteTopicResponse is similar to CreateTopicResponse.

Topic Metadata Request V1

 

TopicMetadataReqeust_V1 => [TopicName]
TopicName => string
TopicMetadataRequest
-
_V1 is an evolved version of TopicMetadataRequest. This request is intended to support two admin operations -
describe
get topic
information
metadata, and check whether some particular admin command (which are
designed as
all asynchronous) has been completed.
The new version of TopicMetadataResponse will include in addition topic level configuration for each topic and replica fetch lag per partition - how far partition replica is behind from the leader broker.
TopicMetadataRequest_V1 requires only topic
TopicMetadataReqeust_V1 => [TopicName]
TopicName => string
TopicMetadataReqeust_V1 requires only topic names. As with the first version, an empty topic name set results in returning information for all existing topics.
Topic Metadata Response V1

 

TopicMetadataResponse_V1 => [Broker][TopicMetadata]
Broker => NodeId Host Port  (any number of brokers may be returned)
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] [ConfigEntry]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader ReplicasLag Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
ReplicasLag => [int32 int32]
Isr => [int32]

 

TopicMetadataResponse_V1 besides errorCode which is used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription structure per topic. See table below for details:

Field

Description

TopicConfigDetails

A structure that holds basic replication details.

ConfigEntry

Topic-level setting and value which was overridden.

TopicPartitionDetails

List describing replication details for each partition.

PartitionId

Id of the partition.

LeaderBroker-leader id for the described partition (or -1 if not defined).
ReplicasLagList of broker ids serving a replica's role for the partition and fetch lag for the replica.
ISRSame as replicas but includes only brokers that are known to be "in-sync"

In case of error TopicDescription field will be returned in response with default values.

 The new version of TopicMetadataResponse in addition to TopicMetadataResponse_V0 will include topic level configuration for each topic and replica fetch lag per partition - how far partition replica is behind from the leader replica.


Replication Commands Schema

Preferred Replica Leader Election Request

 

PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]]
Topic => string
PartitionId => int32
PreferredReplicaLeaderEleactionRequest initiates preferred replica leader election procedure, similar to ReassignPartitionsRequest this to Topic Admin requests this request in intended to be non-blocking. The schema consist of one field - array of partitions for which preferred replica leader should be elected.

To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.

Preferred Replica Leader Election Response

 

PreferredReplicaLeaderElectionResponse => [Topic ErrorCode]
Topic => string
ErrorCode => int16

 

PreferredReplicaLeaderElectionResponse is similar to ReassignPartitionsResponse.

Status of the procedure may be checked with TopicMetadataRequest  - the head of replicas list field and leader broker should be the same.

...

Expand
titleAdminClient API

public class AdminClient {

/**
* A client is instantiated by providing a set of key-value pairs as configuration. Most
* of the settings will be related to NetworkClient
*
* @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
*/
public AdminClient(Properties properties);

/**
* Initiates topics creation.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata about newly created topics was propagated
* to all brokers
*
* @param createTopicRequestBody holder (built by means of respective Builder) of all required arguments to create topics
* @return java.util.concurrent.Future which holds topics creation result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> createTopics(CreateTopicRequestBody createTopicRequestBody) throws ApiException;

/**
* Initiates topics alteration.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored/changed respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that updated metadata about altered topics was propagated
* to all brokers
*
* @param alterTopicRequestBody holder (built by means of respective Builder) of all required arguments to alter topics
* @return java.util.concurrent.Future which holds topics alteration result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> alterTopics(AlterTopicRequestBody alterTopicRequestBody) throws ApiException;

/**
* Initiates topic deletion.
* This is an asynchronous call, it returns immediately once server has accepted request and marked requested topics for deletion in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata with updated topic list was propagated to
* all brokers
*
* @param topics topic names to be deleted
* @return java.util.concurrent.Future which holds topics deletion result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic deletion was not even started
*/
public Future<Map<String, Errors>> deleteTopics(List<String> topics) throws ApiException;

/**
* Lists all available topics in Kafka cluster.
* Topic is considered available if all brokers in cluster have received and cached metadata about it
*
* @return list of topic names
*
* @throws ApiException
*/
public List<String> listTopics() throws ApiException;

/**
* TODO: not finalized yet
* Request replication information about Kafka topics
*
* @return a mapping between topic name and topic description
* @throws ApiException in case of global error, which means topic description cannot be fetched for all topics
*/
public Map<String, DescribeTopicOutput> describeTopics(List<String> topicNames) throws ApiException;

/**
* Initiates long-running reassign partitions procedure.
* This is an asynchronous call, it returns immediately once server has accepted request, and created admin path in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure all that all partitions reassignments have completed.
* Note: currently there are only two possible states for reassigned partition: Completed, In Progress.
*
* @param reassignment reassignmentData schema among which replicas partitions will be reassigned
*
* @return java.util.concurrent.Future which is completed once all partitions have been reassigned
*
* @throws ApiException in case partition reassignment wasn't initiated on server
*/
public Future<Void> reassignPartitions(Map<String, Map<Integer, List<Integer>>> reassignmentPartitionReassignmentData reassignmentData) throws ApiException;


/**
* Checks the interim status of the partitions reassignment.
* Reassignment for concrete partition is considered completed if partition has been removed from
* admin zookeeper path and all cluster brokers have received and cached relevant AR metadata for the
* given partition
*
* @param reassignment reassignmentData schema same as was used for reassign partitions request
*
* @return two maps - completed and partitions for which reassignment is still in progress
* @throws ApiException in case reassignment verification wasn't initiated on server
*/
public ReassignmentResult verifyReassignPartitions(Map<String, Map<Integer, List<Integer>>> reassignmentPartitionReassignmentData reassignmentData) throws ApiException;


/**
* Initiates long-running preferred replica leader election procedure
* This is an asynchronous call, it returns immediately once server has accepted request, and created admin path in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that all partitions leader has moved to
* preferred replica.
* Note: currently there are only two possible states for preferred replica leader election: Completed, In Progress.
*
* @param partitions preferredReplicaElectionData that need to moved leader to preferred replica
*
* @return java.util.concurrent.Future which is completed once all partitions have moved leader to preferred replica
*
* @throws ApiException in case preferred replica leader election wasn't initiated on server
*/
public Future<Void> preferredReplicaLeaderElection(Map<String, List<Integer> partitionsPreferredReplicaLeaderElectionData preferredReplicaElectionData) throws ApiException;


/**
* Checks the interim status of the preferred replica leader election.
* Preferred replica leader election for concrete partition is considered completed if all cluster brokers have received and cached
* relevant metadata for the given partition
*
* @param partitions preferredReplicaElectionData same partitions as for preferred replica leader election request
*
* @return two maps - completed and partitions for which procedure is still in progress
* @throws ApiException in case preferred replica election verification wasn't initiated on server
*/
public PreferredReplicaLeaderElectionResult verifyPreferredReplicaLeaderElection(Map<String, List<Integer> partitionsPreferredReplicaLeaderElectionData preferredReplicaElectionData)
throws ApiException;

}

4. Interactive Shell / CLI tool

...