Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please find details under specific RQ/RP schema proposal.

Schema

The same notation as in  A Guide To The Kafka Protocol is used here. 

New Protocol Errors

It is proposed to add these error codes to the protocol.

Error

Code

Description

Requests
TopicAlreadyExists1001Topic with this name already exists.CreateTopicRequest
InvalidArgumentPartitions1002Either partition field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest, AlterTopicRequest
InvalidArgumentReplicationFactor1004Either replication-factor field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest
DecreasePartitionsNotAllowed1003Invalid partitions argument: decreasing partitions is prohibited.AlterTopicRequest
InvalidArgumentReplicationFactor1004Either replication-factor field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest
InvalidArgumentReplicaAssignment1005Either replication-factor field is invalid (e.g. contains duplicates), or not defined when needed.

CreateTopicRequest, AlterTopicRequest,

ReassignPartitionsRequest

InvalidTopicConfig1006

Either topic-level config setting or value is incorrect.

CreateTopicRequest, AlterTopicRequest
PreferredReplicaLeaderElectionInProgress1007Preferred replica leader election procedure has been already started.PreferredReplicaLeaderElectionRequest
InvalidArgumentPreferredReplicaElectionData1008Preferred replica leader election data is in invalid (bad json, duplicates etc).PreferredReplicaLeaderElectionRequest
    
ReassignPartitionsInProgress1009Reassign partitions procedure has been already started.ReassignPartitionsRequest

Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.

Admin Schema

Overall the idea is to extend existing Wire Protocol to cover all existing topic commands - create-topic, alter-topic, delete-topic, describe-topic, list-topics, reassign-partitions, preferred-replica-leader-election. At the same time, since Wire Protocol is a public API to Kafka cluster, it was agreed that new Admin schema needs to be "orthogonal", i.e. new messages shouldn't duplicate each other or existing messages, if those already cover particular use cases. It is proposed to map existing command to Wire Protocol in the following way:

 

CommandWire Protocol MessageNote
create-topicCreateTopicRequest 
alter-topicAlterTopicRequest 
delete-topicDeleteTopicRequest 
describe-topicTopicMetadataRequest_V1Using new version on TopicMetadataRequest which will include topic-level config
list-topicsTopicMetadataRequestUsing an empty list as input for request, which results in returning metadata for all existing topcis
reassign-partitionsAlterTopicReqeustUsing batch AlterTopicRequest with replica assignment specified
preferred-replica-leader-electionPreferredReplicaLeaderElectionRequest 

 

It is also important that all Admin requests will be asynchronous. This means requests will only initiate particular command, and will not wait until the command is actually completed. There are different reasons to that, but we wan't to make sure that we give users all needed tools to verify whether issued command has been completed. E.g. one can consider topic is created once it is possible to consumer from / produce to this topic. In this case user can leverage TopicMetadataRequest to check all brokers received metadata about the newly created topic.

Finally, Topic Admin schema requests are likely to be used not only in CLI tool, where the common use case in create/change/delete/get a single entity. Since Kafka is able to maintain a huge number of topics it is important that user can efficiently issue many requests at one time. That's why all Topic Admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network calls.

Create Topic

CreateTopic schema is a 

Overall the idea is to extend existing Wire Protocol to cover all existing topic commands - create-topic, alter-topic, delete-topic, describe-topic, list-topics, reassign-partitions, preferred-replica-leader-election. At the same time, since Wire Protocol is a public API to Kafka cluster, it was agreed that new Admin schema needs to be "orthogonal", i.e. new messages shouldn't duplicate each other or existing messages, if those already cover particular use cases. It is proposed to map existing command to Wire Protocol in the following way:

 

CommandWire Protocol MessageNote
create-topicCreateTopicRequest 
alter-topicAlterTopicRequest 
delete-topicDeleteTopicRequest 
describe-topicTopicMetadataRequest_V1Using new version on TopicMetadataRequest which will include topic-level config
list-topicsTopicMetadataRequestUsing an empty list as input for request, which results in returning metadata for all existing topcis
reassign-partitionsAlterTopicReqeustUsing batch AlterTopicRequest with replica assignment specified
preferred-replica-leader-electionPreferredReplicaLeaderElectionRequest 

 

It is also important that all Admin requests are intended to be asynchronous. This means requests will only initiate particular command, and will not wait until the command is actually completed. There are different reasons to that, but we wan't to make sure that we give users all needed tools to verify whether issued command has been completed. E.g. one can consider topic is created once it is possible to consumer from / produce to this topic. In this case user can leverage TopicMetadataRequest to check all brokers received metadata about the newly created topic.

Finally, Topic Admin schema requests are likely to be used not only in CLI tool, where the common use case in create/change/delete/get a single entity. Since Kafka is able to maintain a huge number of topics it is important that user can efficiently issue many requests at one time. That's why all Topic Admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network calls.

 

New Protocol Errors

It is proposed to add these error codes to the protocol.

Error

Description

Requests
TopicAlreadyExistsTopic with this name already exists.CreateTopicRequest
InvalidArgumentTopicNameTopic name contains invalid charactersCreateTopicRequest
InvalidArgumentPartitionsEither partition field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest, AlterTopicRequest
InvalidArgumentReplicationFactorEither replication-factor field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest,AlterTopicRequest
InvalidArgumentReplicaAssignmentEither replication-factor field is invalid (e.g. contains duplicates), or not defined when needed.

CreateTopicRequest, AlterTopicRequest

InvalidArgumentTopicConfig

Either topic-level config setting or value is incorrect.

CreateTopicRequest, AlterTopicRequest
DecreasePartitionsNotAllowedInvalid partitions argument: decreasing partitions is prohibited when altering topic.AlterTopicRequest
PreferredReplicaLeaderElectionInProgressPreferred replica leader election procedure has been already started.PreferredReplicaLeaderElectionRequest
ReassignPartitionsInProgressReassign partitions procedure has been already started.AlterTopicRequest
MultipleInstructionsForOneTopicOnly one mutation is allowed at once: e.g. change topic replication factor or change topic configAlterTopicRequest
MultipleTopicInstructionsInOnebatchMultiple topic instructions for the same topic in one batch requestCreateTopicRequest, AlterTopicRequest, DeleteTopicRequest

Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.

The same notation as in  A Guide To The Kafka Protocol is used here. 

Create Topic Create Topic Request

 

CreateTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [ConfigEntry]]
TopicName => string
Partitions => int32
Replicas => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
ConfigEntry => ConfigKey ConfigValue
ConfigKey => string
ConfigValue => string
CreateTopicRequest requires topic name and either (Partitions+Replicas) or ReplicasAssignment to create a topic. A special value -1 should be used to denote an empty value for Partitions and ReplicasAlso user will be able to specify topic-level configs for the created topic (to use default an empty array should be provided).

The (Partitions, Replicas)/ReplicaAssignment semantics is the following:

1) If ReplicaAssignment is specified - Partitions and Replicas are not taken into account, topic is created with provided replica assignment and number of topics and replication factor are defined from ReplicaAssignment
2) If ReplicaAssignment is empty - number of topic partitions and replication factor must be defined with Partitions and Replicas, the replica assignment for topic is automatically generated on server

...

 

CreateTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

CreateTopicResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Alter Topic Request

 

AlterTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [AddedConfigEntry] [DeletedConfig]]
TopicName => string
Replicas => int32
Partitions => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
AddedConfigEntry => ConfigKey ConfigValue
 ConfigKey => string
 ConfigValue => string
 DeletedConfig => string
AlterTopicRequest is similar to previous, to specify topic level settings that should be removed, use user can provide DeletedConfig array (just setting keys). User can provide new partitions value, replica assignment or both.

AlterTopicRequest contains an optional field Partitions. A special value -1 should be used to denote an empty value. The Partitions/ReplicaAssignment semantics is the following:

1) Partitions is used only to increase number of topic partitions

2) If Partitions value is empty (-1) ReplicaAssignment is not taken into account, topic partitions are not increased

3) If Partitions doesn't increase existing number of partitions an error code DecreasePartitionsNotAllowed is returned

3) If Partitions value is not empty and increases number of existing partitions, a new replica assignment for topic partitions is either automatically generated or defined by ReplicaAssignment (if nonempty)

Alter Topic Response

 

AlterTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

AlterTopicResponse is similar to CreateTopicResponse.
Delete Topic Request

 

DeleteTopicRequest => [TopicName]
TopicName => string
DeleteTopicRequest requires only topic names which should be deleted.
Delete Topic Response

 

DeleteTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

DeleteTopicResponse is similar to CreateTopicResponse.

Topic Metadata Request V1

TopicMetadataRequest-V1 is an evolved version of TopicMetadataRequest. This request is intended to support two admin operations - describe topic information, and check whether some particular admin command (which are designed as asynchronous) has been completed. The new version of TopicMetadataResponse will include in addition topic level configuration for each topic and replica fetch lag per partition - how far partition replica is behind from the leader broker.

TopicMetadataReqeust_V1 => [TopicName]
TopicName => string
TopicMetadataReqeust_V1 requires only topic names. As with the first version, an empty topic name set results in returning information for all existing topics.
Topic Metadata Response V1

 

TopicMetadataResponse_V1 => [Broker][TopicMetadata]
Broker => NodeId Host Port  (any number of brokers may be returned)
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] [ConfigEntry]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader ReplicasLag Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
ReplicasLag => [int32 int32]
Isr => [int32]

 

TopicMetadataResponse_V1 besides errorCode which is used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription structure per topic. See table below for details:

Field

Description

TopicConfigDetails

A structure that holds basic replication details.

ConfigEntry

Topic-level setting and value which was overridden.

TopicPartitionDetails

List describing replication details for each partition.

PartitionId

Id of the partition.

LeaderBroker-leader id for the described partition (or -1 if not defined).
ReplicasLagList of broker ids serving a replica's role for the partition and fetch lag for the replica.
ISRSame as replicas but includes only brokers that are known to be "in-sync"

In case of error TopicDescription field will be returned in response with default values.


Replication Commands Schema

Preferred Replica Leader Election Request

 

PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]]
Topic => string
PartitionId => int32
PreferredReplicaLeaderEleactionRequest initiates preferred replica leader election procedure, similar to ReassignPartitionsRequest this request in intended to be non-blocking. The schema consist of one field - array of partitions for which preferred replica leader should be elected.

To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.

Preferred Replica Leader Election Response

 

PreferredReplicaLeaderElectionResponse => [Topic ErrorCode]
Topic => string
ErrorCode => int16

 

PreferredReplicaLeaderElectionResponse is similar to ReassignPartitionsResponse.

Status of the procedure may be checked with TopicMetadataRequest  - the head of replicas list field and leader broker should be the same.

...