...
Protocol Errors
It is proposed to add these error codes to the protocol.
Error | Code | Description | Requests |
---|---|---|---|
NotControllerReceivedAdminRequest | 1001 | Target broker is not serving a controller's role. | For all Admin requests |
TopicAlreadyExists | 1002 | Topic with this name already exists. | CreateTopicRequest |
InvalidArgumentPartitions | 1003 | Either partition field is invalid (e.g. negative), or not defined when needed. | CreateTopicRequest , AlterTopicRequest |
DecreasePartitionsNotAllowed | 1004 | Invalid partitions argument: decreasing partitions is prohibited. | AlterTopicRequest |
InvalidArgumentReplicationFactor | 1005 | Either replication-factor field is invalid (e.g. negative), or not defined when needed. | CreateTopicRequest |
InvalidArgumentReplicaAssignment | 1006 | Either replication-factor field is invalid (e.g. contains duplicates), or not defined when needed. |
|
InvalidTopicConfig | 1007 | Either topic-level config setting or value is incorrect. | CreateTopicRequest , AlterTopicRequest |
PreferredReplicaLeaderElectionInProgress | 1008 | Preferred replica leader election procedure has been already started. | PreferredReplicaLeaderElectionRequest |
InvalidArgumentPreferredReplicaElectionData | 1009 | Preferred replica leader election data is in invalid (bad json, duplicates etc). | PreferredReplicaLeaderElectionRequest |
ReassignPartitionsInProgress | 1010 | Reassign partitions procedure has been already started. | ReassignPartitionsRequest |
Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.
TopicMetadata_V1 Schema
[1] On start up admin client will be provided with a list of Kafka cluster brokers. It's a client responsibility to define a controller location to send Admin requests to a correct node then.
Currently there is no way to locate controller with existing Wire Protocol messages. It's proposed to extend TopicMetadataRequest to version 1 for these purposes, since it already contains broker list.
Topic Metadata Request
TopicMetadataRequest remains unchanged comparing to version V0.
TopicMetadataRequest_V1 => [TopicName] TopicName => string |
Topic Metadata Response
controller
field.
TopicMetadataResponse_V1 => Controller [Broker][TopicMetadata] Controller => Broker Broker => NodeId Host Port (any number of brokers may be returned) NodeId => int32 Host => string Port => int32 TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr PartitionErrorCode => int16 PartitionId => int32 Leader => int32 Replicas => [int32] Isr => [int32] |
Topic Admin Schema
The idea is to introduce Wire protocol messages that cover all topic commands (create, alter, delete, list, describe). The motivation behind the proposed schema is the following:
1) Topic commands must inherit options from TopicCommand tool
2) If some of the options are not used in particular command (e.g. ReplicaAssignment
in CreateTopicRequest
) - the special marker value is used instead (e.g. in case of ReplicaAssignment
- empty string)
3) Topic commands must support batching and provide command execution result per-topic
4) Topic commands are asynchronous - the request to create/alter/delete just initiates the corresponding commands and returns immediately
5) Topic commands can be executed only on a broker serving a controller's role - in case request is sent to an ordinary broker - a request-level error should reflect that
...
CreateTopicResponse => ErrorCode [TopicName ErrorCode] ErrorCode => int16 TopicName => string |
CreateTopicResponse
is fairly simple - it contains "global" error code (e.g. in case of NotControllerReceivedAdminRequest - see Protocol Errors) and a map between topic and topic creating result.
Alter Topic Request
AlterTopicRequest => [TopicName Partitions ReplicaAssignment [AddedConfigEntry] [DeletedConfig]] TopicName => string Partitions => int32 ReplicaAssignment => string AddedConfigEntry => ConfigKey ConfigValue ConfigKey => string ConfigValue => string Deleted Config => string |
AlterTopicRequest
is similar to previous, to specify topic level settings that should be removed, use DeletedConfig
array (just setting keys). User can provide new partitions value, replica assignment or both.This table defines values that will be used to denote "no value":
Field | Value |
---|---|
Partitions | -1 |
ReplicaAssignment | "" (empty string) |
Alter Topic Response
ErrorCode [TopicName ErrorCode]ErrorCode => int16 TopicName => string
|
AlterTopicResponse
is similar to CreateTopicResponse
.Delete Topic Request
DeleteTopicRequest => [TopicName] TopicName => string |
DeleteTopicRequest
requires only topic names which should be deleted.
Delete Topic Response
ErrorCode => int16 TopicName => string
|
DeleteTopicResponse
is similar to CreateTopicResponse
.
Describe Topic Request
DescribeTopicRequest => [ TopicName]TopicName => string |
DescribeTopicRequest
requires only topic names.Describe Topic Response
DescribeTopicResponse => ErrorCode [TopicName ErrorCode TopicDescription] ErrorCode => int16 TopicName => string TopicDescription => TopicName TopicConfigDetails [TopicPartitionDetails] TopicName => string TopicConfigDetails => Partitions ReplicationFactor [ConfigEntry] Partitions => int32 ReplicationFactor => int32 ConfigEntry => string string TopicPartitionsDetails => PartitionId ?(Leader) [Replica] [ISR] PartitionId => int32 Leader => int32 Replica => int32 ISR => int32 |
DescribeTopicResponse
besides errorCode which is used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription
structure per topic. See table below for details:
Field | Description |
---|---|
TopicConfigDetails | A structure that holds basic replication details. |
Partitions | Number of partitions in give topic. |
Config | Topic-level setting and value which was overridden. |
TopicPartitionDetails | List describing replication details for each partition. |
PartitionId | Id of the partition. |
Leader | Optional broekr-leader id for the described partition. |
Replicas | List of broker ids serving a replica's role for the partition. |
ISR | Same as replicas but includes only brokers that are known to be "in-sync" |
In case of error TopicDescription
field will be returned in response with default values.
List Topics Request
ListTopicsRequest => |
ListTopicsRequest
is a request with no arguments.List Topics Response
ListTopicsResponse => ErrorCode [TopicName] ErrorCode => int16 TopicName => string |
ListTopicsResponse
besides errorCode holds a list of topics in Kafka cluster.
Replication Commands Schema
Reassign Partitions
Reassign Partitions Request
ReassignPartitionRequest => ManualAssignment [Topic PartitionId [ReplicaId]] Topic ManualAssignment => string PartitionId => int32 ReplicaId => int32 |
ReassignPartitionsRequest
requires manual partition assignment string. Parsing / validation - partition to array of replicas mapping. Validation for partition / replicas existence is done on server. This request will only initiate partition reassignment and return immediately. It is client's responsibility to block the user continually sending VerifyReassignPartitionsRequest
to check reassignment status. The format is the following:
{
"partitions": [
{"topic": "foo",
"partition": 1,
"replicas": [1,2,3] }
],
"version":1
}
Status of the ReassignPartitionRequest
can be checked with VerifyReassignParitionRequest
.
Reassign Partitions Response
ReassignPartitionResponse => ErrorCode
|
ReassignPartitionResponse
holds just an error code holds just an error code, non-empty if reassignment may not be started (e.g. due to validation error).
Verify Reassign Partitions Request
[Topic PartitionId [ReplicaId]] Topic => string PartitionId => int32 ReplicaId => int32 |
VerifyReassignPartitionsRequest
requires manual partition assignment string as with ReassignPartitionsRequest
which status is verified by this request- partition to array of replicas mapping - the same as for ReassignPartitionsRequest
, to check whether reassignment is completed and new assignment matches requested.Verify Reassign Partitions Response
VerifyReassignPartitionResponse => ErrorCode [ReasignmnetResult] ErrorCode => int16 ReasignmnetResult => TopicAndPartition ResultCode TopicAndPartition => string int32 ResultCode => int16 |
VerifyReassignPartitionResponse
as with other Admin request may returns an error code and a reassignment result map. It holds reassignment status (-1
- reassignment failed, 0
- in progress, 1
- completed successfully).
Preferred Replica Leader Election
Preferred Replica Leader Election Request
PreferredReplicaLeaderElectionRequest => [Topic PartitionId] Topic => string PartitionsSerialized PartitionId => stringint32 |
PreferredReplicaLeaderEleactionRequest
initiates preferred replica leader election procedure, similar to ReassignPartitionsRequest
this request in intended to be non-blocking. The schema consist of one optional field - partitions in serialized form (json) array of partitions for which procedure preferred replica leader should be started. The format is the following:{"partitions":
[
{"topic": "foo", "partition": 1},
{"topic": "foobar", "partition": 2}
]
}
To start preferred replica leader election procedure for all existing partition an empty string partitions array should be send in requestsent.
Preferred Replica Leader Election Response
PreferredReplicaLeaderElectionResponse => ErrorCode ErrorCode => int16
|
PreferredReplicaLeaderElectionResponse
is similar to ReassignPartitionsResponse
.
Status of the procedure may be checked with DescribeTopicRequest
- the head of replicas
list field and leader
broker should be the same.
...