...
The same notation as in A Guide To The Kafka Protocol is used here.
New Protocol Errors
It is proposed to add these error codes to the protocol.
Error | Code | Description | Requests | ||||
---|---|---|---|---|---|---|---|
NotControllerReceivedAdminRequestTopicAlreadyExists | 1001 | Target broker is not serving a controller's role. | For all Admin requests | TopicAlreadyExists | 1002 | Topic with this name already exists. | CreateTopicRequest |
InvalidArgumentPartitions | 10031002 | Either partition field is invalid (e.g. negative), or not defined when needed. | CreateTopicRequest , AlterTopicRequest | ||||
DecreasePartitionsNotAllowed | 10041003 | Invalid partitions argument: decreasing partitions is prohibited. | AlterTopicRequest | ||||
InvalidArgumentReplicationFactor | 10051004 | Either replication-factor field is invalid (e.g. negative), or not defined when needed. | CreateTopicRequest | ||||
InvalidArgumentReplicaAssignment | 10061005 | Either replication-factor field is invalid (e.g. contains duplicates), or not defined when needed. |
| ||||
InvalidTopicConfig | 10071006 | Either topic-level config setting or value is incorrect. | CreateTopicRequest , AlterTopicRequest | ||||
PreferredReplicaLeaderElectionInProgress | 10081007 | Preferred replica leader election procedure has been already started. | PreferredReplicaLeaderElectionRequest | ||||
InvalidArgumentPreferredReplicaElectionData | 10091008 | Preferred replica leader election data is in invalid (bad json, duplicates etc). | PreferredReplicaLeaderElectionRequest | ||||
ReassignPartitionsInProgress | 10101009 | Reassign partitions procedure has been already started. | ReassignPartitionsRequest |
Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.
TopicMetadata_V1 Schema
[1] On start up admin client will be provided with a list of Kafka cluster brokers. It's a client responsibility to define a controller location to send Admin requests to a correct node then.
Currently there is no way to locate controller with existing Wire Protocol messages. It's proposed to extend TopicMetadataRequest to version 1 for these purposes, since it already contains broker list.
Topic Metadata Request
TopicMetadataRequest remains unchanged comparing to version V0.
TopicMetadataRequest_V1 => [TopicName] TopicName => string |
Topic Metadata Response
controller
field.
TopicMetadataResponse_V1 => Controller [Broker][TopicMetadata] Controller => Broker Broker => NodeId Host Port (any number of brokers may be returned) NodeId => int32 Host => string Port => int32 TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr PartitionErrorCode => int16 PartitionId => int32 Leader => int32 Replicas => [int32] Isr => [int32] |
Topic Admin Schema
The idea is to introduce Wire protocol messages that cover all topic commands (create, alter, delete, list, describe). The motivation behind the proposed schema is the following:
1) Topic commands must inherit options from TopicCommand tool
2) If some of the options are not used in particular command (e.g. ReplicaAssignment
in CreateTopicRequest
) - the special marker value is used instead (e.g. in case of ReplicaAssignment
- empty string)
3) Topic commands must support batching and provide command execution result per-topic
4) Topic commands are asynchronous - the request to create/alter/delete just initiates the corresponding commands and returns immediately
5) Topic commands can be executed only on a broker serving a controller's role - in case request is sent to an ordinary broker - a request-level error should reflect that
...
CreateTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [ConfigEntry]] TopicName => string Partitions => int32 Replicas => int32 ReplicaAssignment => string [Topic [PartitionId [ReplicaId]]]
ConfigKey => string ConfigValue => string |
CreateTopicRequest
requires topic name and either (partitionsPartitions
+replicasReplicas
) or replicas assignment ReplicasAssignment
to create topic (validation is done on server side). You can also specify topic-a topic. A special value -1
should be used to denote an empty value for Partitions
and Replicas
. Also user will be able to specify topic-level configs to create topic with (to use default set an empty array should be provided).This table defines values that will be used to denote "no value":
Field | Value |
---|---|
Partitions | -1 |
Replicas | -1 |
ReplicaAssignment | "" (empty string) |
The (Partitions, Replicas)/ReplicaAssignment
semantics is the following:
ReplicaAssignment
is specified - Partitions
and Replicas
are not taken into account, topic is created with provided replica assignment and number of topics and replication factor are defined from ReplicaAssignment
ReplicaAssignment
is empty - number of topic partitions and replication factor must be defined with Partitions
and Replicas
, the replica assignment for topic is automatically generated on serverCreate Topic Response
CreateTopicResponse => ErrorCode [TopicName ErrorCode] ErrorCode => int16 TopicName => string |
CreateTopicResponse
is fairly simple - it contains "global" error code (e.g. in case of NotControllerReceivedAdminRequest - see Protocol Errors) and a map between topic and topic creating result contains a map between topic and topic creating result error code (see Protocol Errors).
Alter Topic Request
AlterTopicRequest => [TopicName Partitions ReplicaAssignment [AddedConfigEntry] [DeletedConfig]] TopicName => string Partitions => int32 ReplicaAssignment => string [Topic [PartitionId [ReplicaId]]] AddedConfigEntry => ConfigKey ConfigValue ConfigKey => string ConfigValue => string Deleted Config => string |
AlterTopicRequest
is similar to previous, to specify topic level settings that should be removed, use DeletedConfig
array (just setting keys). User can provide new partitions value, replica assignment or both.This table defines values that will AlterTopicRequest
contains an optional field Partitions
. A special value -1
should be used to denote "no value":
Field | Value |
---|---|
Partitions | -1 |
ReplicaAssignment | "" (empty string) |
an empty value. The Partitions/ReplicaAssignment
semantics is the following:
1) Partitions
is used only to increase number of topic partitions.
2) If Partitions
value is empty (-1
) ReplicaAssignment
is not taken into account, topic partitions are not increased
3) If Partitions
doesn't increase existing number of partitions an error code DecreasePartitionsNotAllowed is returned
3) If Partitions
value is not empty and increases number of existing partitions, a new replica assignment for topic partitions is either automatically generated or defined by ReplicaAssignment
(if nonempty)
Alter Topic Response
ErrorCode [TopicName ErrorCode]ErrorCode => int16 TopicName => string
|
AlterTopicResponse
is similar to CreateTopicResponse
.Delete Topic Request
DeleteTopicRequest => [TopicName] TopicName => string |
DeleteTopicRequest
requires only topic names which should be deleted.
Delete Topic Response
ErrorCode => int16 TopicName => string
|
DeleteTopicResponse
is similar to CreateTopicResponse
.
Describe Topic Request
DescribeTopicRequest => [ TopicName]TopicName => string |
DescribeTopicRequest
requires only topic names.Describe Topic Response
DescribeTopicResponse => ErrorCode [TopicName ErrorCode TopicDescription] ErrorCode => int16 TopicName => string TopicDescription => TopicName TopicConfigDetails [TopicPartitionDetails] TopicName => string TopicConfigDetails => Partitions ReplicationFactor [ConfigEntry] Partitions => int32 ReplicationFactor => int32 ConfigEntry => string string TopicPartitionsDetails => PartitionId ?( Leader ) [Replica] [ISR] PartitionId => int32 Leader => int32 Replica => int32 ISR => int32 |
DescribeTopicResponse
besides errorCode which is used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription
structure per topic. See table below for details:
Field | Description | ||
---|---|---|---|
TopicConfigDetails | A structure that holds basic replication details. | Partitions | Number of partitions in give topicdetails. |
Config | Topic-level setting and value which was overridden. | ||
TopicPartitionDetails | List describing replication details for each partition. | ||
PartitionId | Id of the partition. | ||
Leader | Optional broekrBroker-leader id for the described partition (or -1 if not defined). | ||
Replicas | List of broker ids serving a replica's role for the partition. | ||
ISR | Same as replicas but includes only brokers that are known to be "in-sync" |
In case of error TopicDescription
field will be returned in response with default values.
List Topics Request
ListTopicsRequest => |
ListTopicsRequest
is a request with no arguments.List Topics Response
ListTopicsResponse => ErrorCode [TopicName] ErrorCode => int16 TopicName => string |
ListTopicsResponse
besides errorCode holds a list of topics in Kafka cluster.
Replication Commands Schema
Reassign Partitions
Reassign Partitions Request
ReassignPartitionRequest => [Topic [PartitionId [ReplicaId]]] Topic => string PartitionId => int32 ReplicaId => int32 |
ReassignPartitionsRequest
requires partition assignment - partition to array of replicas mapping. Validation for partition / replicas existence is done on server. Status of the ReassignPartitionRequest
can be checked with VerifyReassignParitionRequest
.
Reassign Partitions Response
ReassignPartitionResponse => [Topic ErrorCode] Topic => string ErrorCode => int32 |
ReassignPartitionResponse
holds just an error code per topic, non-empty if reassignment may not be started (e.g. due to validation error).
Verify Reassign Partitions Request
[Topic [PartitionId [ReplicaId]]]Topic => string PartitionId => int32 ReplicaId => int32 |
VerifyReassignPartitionsRequest
requires partition assignment - partition to array of replicas mapping - the same as for ReassignPartitionsRequest
, to check whether reassignment is completed and new assignment matches requested.Verify Reassign Partitions Response
VerifyReassignPartitionResponse => ErrorCode [ReasignmnetResult] ReasignmnetResult => Topic [PartitionId ResultCode] Topic => string TopicAndPartition PartitionId => string int32 ResultCode => int16 |
VerifyReassignPartitionResponse
as with other Admin request may returns an error code and a reassignment result map. It holds reassignment status (-1
- reassignment failed, 0
- in progress, 1
- completed successfully) per topic and partition.
Preferred Replica Leader Election
Preferred Replica Leader Election Request
PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]] Topic => string PartitionId => int32 |
PreferredReplicaLeaderEleactionRequest
initiates preferred replica leader election procedure, similar to ReassignPartitionsRequest
this request in intended to be non-blocking. The schema consist of one optional field - array of partitions for which preferred replica leader should be elected.To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.
Preferred Replica Leader Election Response
PreferredReplicaLeaderElectionResponse => [Topic ErrorCode] Topic => string ErrorCode => int16 |
PreferredReplicaLeaderElectionResponse
is similar to ReassignPartitionsResponse
.
Status of the procedure may be checked with DescribeTopicRequest
- the head of replicas
list field and leader
broker should be the same.
...