...
Please find details under specific RQ/RP schema proposal.
Schema
The same notation as in A Guide To The Kafka Protocol is used here.
New Protocol Errors
It is proposed to add these error codes to the protocol.
Error | Code | Description | Requests |
---|---|---|---|
TopicAlreadyExists | 1001 | Topic with this name already exists. | CreateTopicRequest |
InvalidArgumentPartitions | 1002 | Either partition field is invalid (e.g. negative), or not defined when needed. | CreateTopicRequest , AlterTopicRequest |
InvalidArgumentReplicationFactor | 1004 | Either replication-factor field is invalid (e.g. negative), or not defined when needed. | CreateTopicRequest |
DecreasePartitionsNotAllowed | 1003 | Invalid partitions argument: decreasing partitions is prohibited. | AlterTopicRequest |
InvalidArgumentReplicationFactor | 1004 | Either replication-factor field is invalid (e.g. negative), or not defined when needed. | CreateTopicRequest |
InvalidArgumentReplicaAssignment | 1005 | Either replication-factor field is invalid (e.g. contains duplicates), or not defined when needed. |
|
InvalidTopicConfig | 1006 | Either topic-level config setting or value is incorrect. | CreateTopicRequest , AlterTopicRequest |
PreferredReplicaLeaderElectionInProgress | 1007 | Preferred replica leader election procedure has been already started. | PreferredReplicaLeaderElectionRequest |
InvalidArgumentPreferredReplicaElectionData | 1008 | Preferred replica leader election data is in invalid (bad json, duplicates etc). | PreferredReplicaLeaderElectionRequest |
ReassignPartitionsInProgress | 1009 | Reassign partitions procedure has been already started. | ReassignPartitionsRequest |
Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.
Admin Schema
Command | Wire Protocol Message | Note |
---|---|---|
create-topic | CreateTopicRequest | |
alter-topic | AlterTopicRequest | |
delete-topic | DeleteTopicRequest | |
describe-topic | TopicMetadataRequest_V1 | Using new version on TopicMetadataRequest which will include topic-level config |
list-topics | TopicMetadataRequest | Using an empty list as input for request, which results in returning metadata for all existing topcis |
reassign-partitions | AlterTopicReqeust | Using batch AlterTopicRequest with replica assignment specified |
preferred-replica-leader-election | PreferredReplicaLeaderElectionRequest |
It is also important that all Admin requests will be asynchronous. This means requests will only initiate particular command, and will not wait until the command is actually completed. There are different reasons to that, but we wan't to make sure that we give users all needed tools to verify whether issued command has been completed. E.g. one can consider topic is created once it is possible to consumer from / produce to this topic. In this case user can leverage TopicMetadataRequest to check all brokers received metadata about the newly created topic.
Finally, Topic Admin schema requests are likely to be used not only in CLI tool, where the common use case in create/change/delete/get a single entity. Since Kafka is able to maintain a huge number of topics it is important that user can efficiently issue many requests at one time. That's why all Topic Admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network calls.
Create Topic
CreateTopic schema is a
Command | Wire Protocol Message | Note |
---|---|---|
create-topic | CreateTopicRequest | |
alter-topic | AlterTopicRequest | |
delete-topic | DeleteTopicRequest | |
describe-topic | TopicMetadataRequest_V1 | Using new version on TopicMetadataRequest which will include topic-level config |
list-topics | TopicMetadataRequest | Using an empty list as input for request, which results in returning metadata for all existing topcis |
reassign-partitions | AlterTopicReqeust | Using batch AlterTopicRequest with replica assignment specified |
preferred-replica-leader-election | PreferredReplicaLeaderElectionRequest |
It is also important that all Admin requests are intended to be asynchronous. This means requests will only initiate particular command, and will not wait until the command is actually completed. There are different reasons to that, but we wan't to make sure that we give users all needed tools to verify whether issued command has been completed. E.g. one can consider topic is created once it is possible to consumer from / produce to this topic. In this case user can leverage TopicMetadataRequest to check all brokers received metadata about the newly created topic.
Finally, Topic Admin schema requests are likely to be used not only in CLI tool, where the common use case in create/change/delete/get a single entity. Since Kafka is able to maintain a huge number of topics it is important that user can efficiently issue many requests at one time. That's why all Topic Admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network calls.
New Protocol Errors
It is proposed to add these error codes to the protocol.
Error | Description | Requests |
---|---|---|
TopicAlreadyExists | Topic with this name already exists. | CreateTopicRequest |
InvalidArgumentTopicName | Topic name contains invalid characters | CreateTopicRequest |
InvalidArgumentPartitions | Either partition field is invalid (e.g. negative), or not defined when needed. | CreateTopicRequest , AlterTopicRequest |
InvalidArgumentReplicationFactor | Either replication-factor field is invalid (e.g. negative), or not defined when needed. | CreateTopicRequest, |
InvalidArgumentReplicaAssignment | Either replication-factor field is invalid (e.g. contains duplicates), or not defined when needed. |
|
InvalidArgumentTopicConfig | Either topic-level config setting or value is incorrect. | CreateTopicRequest , AlterTopicRequest |
DecreasePartitionsNotAllowed | Invalid partitions argument: decreasing partitions is prohibited when altering topic. | AlterTopicRequest |
PreferredReplicaLeaderElectionInProgress | Preferred replica leader election procedure has been already started. | PreferredReplicaLeaderElectionRequest |
ReassignPartitionsInProgress | Reassign partitions procedure has been already started. | AlterTopicRequest |
MultipleInstructionsForOneTopic | Only one mutation is allowed at once: e.g. change topic replication factor or change topic config | AlterTopicRequest |
MultipleTopicInstructionsInOnebatch | Multiple topic instructions for the same topic in one batch request | CreateTopicRequest, AlterTopicRequest, DeleteTopicRequest |
Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.
The same notation as in A Guide To The Kafka Protocol is used here.
Create Topic Create Topic Request
CreateTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [ConfigEntry]] TopicName => string Partitions => int32 Replicas => int32 ReplicaAssignment => [PartitionId [ReplicaId]]
ConfigKey => string ConfigValue => string |
CreateTopicRequest
requires topic name and either (Partitions
+Replicas
) or ReplicasAssignment
to create a topic. A special value -1
should be used to denote an empty value for Partitions
and Replicas
. Also user will be able to specify topic-level configs for the created topic (to use default an empty array should be provided).The (Partitions, Replicas)/ReplicaAssignment
semantics is the following:
ReplicaAssignment
is specified - Partitions
and Replicas
are not taken into account, topic is created with provided replica assignment and number of topics and replication factor are defined from ReplicaAssignment
ReplicaAssignment
is empty - number of topic partitions and replication factor must be defined with Partitions
and Replicas
, the replica assignment for topic is automatically generated on server...
CreateTopicResponse => [TopicName ErrorCode] ErrorCode => int16 TopicName => string |
CreateTopicResponse
contains a map between topic and topic creation result error code (see New Protocol Errors).
Alter Topic Request
AlterTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [AddedConfigEntry] [DeletedConfig]] TopicName => string Replicas => int32 Partitions => int32 ReplicaAssignment => [PartitionId [ReplicaId]] AddedConfigEntry => ConfigKey ConfigValue ConfigKey => string ConfigValue => string Deleted Config => string |
AlterTopicRequest
is similar to previous, to specify topic level settings that should be removed, use user can provide DeletedConfig
array (just setting keys). User can provide new partitions value, replica assignment or both.AlterTopicRequest
contains an optional field Partitions
. A special value -1
should be used to denote an empty value. The Partitions/ReplicaAssignment
semantics is the following:
1) Partitions
is used only to increase number of topic partitions
2) If Partitions
value is empty (-1
) ReplicaAssignment
is not taken into account, topic partitions are not increased
3) If Partitions
doesn't increase existing number of partitions an error code DecreasePartitionsNotAllowed is returned
3) If Partitions
value is not empty and increases number of existing partitions, a new replica assignment for topic partitions is either automatically generated or defined by ReplicaAssignment
(if nonempty)
Alter Topic Response
[TopicName ErrorCode]ErrorCode => int16 TopicName => string
|
AlterTopicResponse
is similar to CreateTopicResponse
.Delete Topic Request
DeleteTopicRequest => [TopicName] TopicName => string |
DeleteTopicRequest
requires only topic names which should be deleted.
Delete Topic Response
ErrorCode => int16 TopicName => string
|
DeleteTopicResponse
is similar to CreateTopicResponse
.
Topic Metadata Request V1
TopicMetadataRequest-V1
is an evolved version of TopicMetadataRequest. This request is intended to support two admin operations - describe topic information, and check whether some particular admin command (which are designed as asynchronous) has been completed. The new version of TopicMetadataResponse will include in addition topic level configuration for each topic and replica fetch lag per partition - how far partition replica is behind from the leader broker.
TopicMetadataReqeust_V1 => [ TopicName]TopicName => string |
TopicMetadataReqeust_V1
requires only topic names. As with the first version, an empty topic name set results in returning information for all existing topics.Topic Metadata Response V1
TopicMetadataResponse_V1 => [Broker][TopicMetadata] Broker => NodeId Host Port (any number of brokers may be returned) NodeId => int32 Host => string Port => int32 TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] [ConfigEntry] TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader ReplicasLag Isr PartitionErrorCode => int16 PartitionId => int32 Leader => int32 ReplicasLag => [int32 int32] Isr => [int32] |
TopicMetadataResponse_V1
besides errorCode which is used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription
structure per topic. See table below for details:
Field | Description |
---|---|
TopicConfigDetails | A structure that holds basic replication details. |
ConfigEntry | Topic-level setting and value which was overridden. |
TopicPartitionDetails | List describing replication details for each partition. |
PartitionId | Id of the partition. |
Leader | Broker-leader id for the described partition (or -1 if not defined). |
ReplicasLag | List of broker ids serving a replica's role for the partition and fetch lag for the replica. |
ISR | Same as replicas but includes only brokers that are known to be "in-sync" |
In case of error TopicDescription
field will be returned in response with default values.
Replication Commands Schema
Preferred Replica Leader Election Request
PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]] Topic => string PartitionId => int32 |
PreferredReplicaLeaderEleactionRequest
initiates preferred replica leader election procedure, similar to ReassignPartitionsRequest
this request in intended to be non-blocking. The schema consist of one field - array of partitions for which preferred replica leader should be elected.To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.
Preferred Replica Leader Election Response
PreferredReplicaLeaderElectionResponse => [Topic ErrorCode] Topic => string ErrorCode => int16 |
PreferredReplicaLeaderElectionResponse
is similar to ReassignPartitionsResponse
.
Status of the procedure may be checked with TopicMetadataRequest
- the head of replicas
list field and leader
broker should be the same.
...