...
Extending protocol with these messages:
- CreateTopic(Request | Response), AlterTopic, DeleteTopic, ListTopics, DescribeTopic
- PreferredLeaderReplicaElection
- ReassignPartitions, VerifyReassignPartitions
- New client: AdminClient - a Wire Protocol client for administrative operations
- New CLI tool: Kafka Shell - a single unified command line interface for administrative operations
...
- Topic commands which include
CreateTopic(Request | Response)
,AlterTopic
,DeleteTopic
,DescribeTopic
,ListTopics.
- Replication tools -
ReassingPartition,
VerifyReassingPartitions
;PreferredReplicaLeaderElection
...
New Protocol Errors
It is proposed to add these error codes to the protocol.
Error | Code | Description | Requests |
---|---|---|---|
TopicAlreadyExists | 1001 | Topic with this name already exists. | CreateTopicRequest |
InvalidArgumentPartitions | 1002 | Either partition field is invalid (e.g. negative), or not defined when needed. | CreateTopicRequest , AlterTopicRequest |
DecreasePartitionsNotAllowed | 1003 | Invalid partitions argument: decreasing partitions is prohibited. | AlterTopicRequest |
InvalidArgumentReplicationFactor | 1004 | Either replication-factor field is invalid (e.g. negative), or not defined when needed. | CreateTopicRequest |
InvalidArgumentReplicaAssignment | 1005 | Either replication-factor field is invalid (e.g. contains duplicates), or not defined when needed. |
|
InvalidTopicConfig | 1006 | Either topic-level config setting or value is incorrect. | CreateTopicRequest , AlterTopicRequest |
PreferredReplicaLeaderElectionInProgress | 1007 | Preferred replica leader election procedure has been already started. | PreferredReplicaLeaderElectionRequest |
InvalidArgumentPreferredReplicaElectionData | 1008 | Preferred replica leader election data is in invalid (bad json, duplicates etc). | PreferredReplicaLeaderElectionRequest |
ReassignPartitionsInProgress | 1009 | Reassign partitions procedure has been already started. | ReassignPartitionsRequest |
Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.
Topic Admin Schema
The idea is to introduce Wire protocol messages that cover all topic commands (create, alter, delete, list, describe). The motivation behind the proposed schema is the following:
1) Topic commands must inherit options from TopicCommand tool
2) If some of the options are not used in particular command (e.g. ReplicaAssignment
in CreateTopicRequest
) - the special marker value is used instead (e.g. in case of ReplicaAssignment
- empty string)
3) Topic commands must support batching and provide command execution result per-topic
4) Topic commands are asynchronous - the request to create/alter/delete just initiates the corresponding commands and returns immediately
...
CreateTopicResponse => [TopicName ErrorCode] ErrorCode => int16 TopicName => string |
CreateTopicResponse
contains a map between topic and topic creation result error code (see New Protocol Errors).
Alter Topic Request
AlterTopicRequest => [TopicName Partitions ReplicaAssignment [AddedConfigEntry] [DeletedConfig]] TopicName => string Partitions => int32 ReplicaAssignment => [Topic [PartitionId [ReplicaId]]] AddedConfigEntry => ConfigKey ConfigValue ConfigKey => string ConfigValue => string Deleted Config => string |
AlterTopicRequest
is similar to previous, to specify topic level settings that should be removed, use DeletedConfig
array (just setting keys). User can provide new partitions value, replica assignment or both.AlterTopicRequest
contains an optional field Partitions
. A special value -1
should be used to denote an empty value. The Partitions/ReplicaAssignment
semantics is the following:
1) Partitions
is used only to increase number of topic partitions
2) If Partitions
value is empty (-1
) ReplicaAssignment
is not taken into account, topic partitions are not increased
3) If Partitions
doesn't increase existing number of partitions an error code DecreasePartitionsNotAllowed is returned
3) If Partitions
value is not empty and increases number of existing partitions, a new replica assignment for topic partitions is either automatically generated or defined by ReplicaAssignment
(if nonempty)
Alter Topic Response
[TopicName ErrorCode]ErrorCode => int16 TopicName => string
|
AlterTopicResponse
is similar to CreateTopicResponse
.Delete Topic Request
DeleteTopicRequest => [TopicName] TopicName => string |
DeleteTopicRequest
requires only topic names which should be deleted.
Delete Topic Response
ErrorCode => int16 TopicName => string
|
DeleteTopicResponse
is similar to CreateTopicResponse
.
Describe Topic Request
DescribeTopicRequest
is intended to replace TopicMetadataRequest
in future versions.
DescribeTopicRequest => [ TopicName]TopicName => string |
DescribeTopicRequest
requires only topic names.Describe Topic Response
DescribeTopicResponse => [TopicName ErrorCode TopicDescription] ErrorCode => int16 TopicName => string TopicDescription => TopicConfigDetails [TopicPartitionDetails] TopicConfigDetails => [ConfigEntry] ConfigEntry => string string TopicPartitionsDetails => PartitionId Leader [Replica] [ISR] PartitionId => int32 Leader => int32 Replica => int32 ISR => int32 |
DescribeTopicResponse
besides errorCode which is used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription
structure per topic. See table below for details:
Field | Description |
---|---|
TopicConfigDetails | A structure that holds basic replication details. |
ConfigEntry | Topic-level setting and value which was overridden. |
TopicPartitionDetails | List describing replication details for each partition. |
PartitionId | Id of the partition. |
Leader | Broker-leader id for the described partition (or -1 if not defined). |
Replicas | List of broker ids serving a replica's role for the partition. |
ISR | Same as replicas but includes only brokers that are known to be "in-sync" |
In case of error TopicDescription
field will be returned in response with default values.
List Topics Request
ListTopicsRequest => |
ListTopicsRequest
is a request with no arguments.List Topics Response
ListTopicsResponse => ErrorCode [TopicName] ErrorCode => int16 TopicName => string |
ListTopicsResponse
besides errorCode holds a list of topics in Kafka cluster.
Replication Commands Schema
Reassign Partitions
Reassign Partitions Request
ReassignPartitionRequest => [Topic [PartitionId [ReplicaId]]] Topic => string PartitionId => int32 ReplicaId => int32 |
ReassignPartitionsRequest
requires partition assignment - partition to array of replicas mapping. Validation for partition / replicas existence is done on server. Status of the ReassignPartitionRequest
can be checked with VerifyReassignParitionRequest
.
Reassign Partitions Response
ReassignPartitionResponse => [Topic ErrorCode] Topic => string ErrorCode => int32 |
ReassignPartitionResponse
holds an error code per topic, non-empty if reassignment may not be started (e.g. due to validation error).
Verify Reassign Partitions Request
[Topic [PartitionId [ReplicaId]]]Topic => string PartitionId => int32 ReplicaId => int32 |
VerifyReassignPartitionsRequest
requires partition assignment - partition to array of replicas mapping - the same as for ReassignPartitionsRequest
, to check whether reassignment is completed and new assignment matches requested.Verify Reassign Partitions Response
VerifyReassignPartitionResponse => [ReasignmnetResult] ReasignmnetResult => Topic [PartitionId ResultCode] Topic => string PartitionId => int32 ResultCode => int16 |
VerifyReassignPartitionResponse
returns a reassignment result map. It holds reassignment status (-1
- reassignment failed, 0
- in progress, 1
- completed successfully) per topic and partitionReassignment status can be checked with DescribeTopicRequest
field AssignedReplicas
.
Preferred Replica Leader Election
Preferred Replica Leader Election Request
PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]] Topic => string PartitionId => int32 |
PreferredReplicaLeaderEleactionRequest
initiates preferred replica leader election procedure, similar to ReassignPartitionsRequest
this request in intended to be non-blocking. The schema consist of one optional field - array of partitions for which preferred replica leader should be elected.To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.
Preferred Replica Leader Election Response
PreferredReplicaLeaderElectionResponse => [Topic ErrorCode] Topic => string ErrorCode => int16 |
PreferredReplicaLeaderElectionResponse
is similar to ReassignPartitionsResponse
.
Status of the procedure may be checked with DescribeTopicRequest
- the head of replicas
list field and leader
broker should be the same.
...