Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

1. Wire Protocol Extensions

Overview

For each type of Admin Request a separate type of Wire protocol message is created.

It is proposed to add / modify these 3 types of requests:

  • Topic commands which include CreateTopic(Request | Response)AlterTopicDeleteTopicDescribeTopicListTopics.
  • Replication tools - ReassingPartitionPreferredReplicaLeaderElection PreferredReplicaLeaderElection
  • Extend TopicMetadataRequest to include topic configuration, and partition fetch lag per replica

Please find details under specific RQ/RP schema proposal.

...

 

CreateTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

CreateTopicResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Alter Topic Request

 

AlterTopicRequest => [TopicName Partitions ReplicaAssignment [AddedConfigEntry] [DeletedConfig]]
TopicName => string
Partitions => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
AddedConfigEntry => ConfigKey ConfigValue
 ConfigKey => string
 ConfigValue => string
 DeletedConfig => string
AlterTopicRequest is similar to previous, to specify topic level settings that should be removed, use DeletedConfig array (just setting keys). User can provide new partitions value, replica assignment or both.

AlterTopicRequest contains an optional field Partitions. A special value -1 should be used to denote an empty value. The Partitions/ReplicaAssignment semantics is the following:

1) Partitions is used only to increase number of topic partitions

2) If Partitions value is empty (-1) ReplicaAssignment is not taken into account, topic partitions are not increased

3) If Partitions doesn't increase existing number of partitions an error code DecreasePartitionsNotAllowed is returned

3) If Partitions value is not empty and increases number of existing partitions, a new replica assignment for topic partitions is either automatically generated or defined by ReplicaAssignment (if nonempty)

Alter Topic Response

 

AlterTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

AlterTopicResponse is similar to CreateTopicResponse.
Delete Topic Request

 

DeleteTopicRequest => [TopicName]
TopicName => string
DeleteTopicRequest requires only topic names which should be deleted.
Delete Topic Response

 

DeleteTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

DeleteTopicResponse is similar to CreateTopicResponse.

Describe
Topic Metadata Request V1

TopicMetadataRequest-V1 is an evolved version of TopicMetadataRequest. This request is intended to support two admin operations - describe topic information, and check whether some particular admin command (which are designed as asynchronous) has been completed. The new version of TopicMetadataResponse will include in addition topic level configuration for each topic and replica fetch lag per partition - how far partition replica is behind from the leader broker.

TopicMetadataReqeust_V1

DescribeTopicRequest is intended to replace TopicMetadataRequest in future versions.

DescribeTopicRequest => [TopicName]
TopicName => string
DescribeTopicRequest TopicMetadataReqeust_V1 requires only topic names.
Describe Topic Response
As with the first version, an empty topic name set results in returning information for all existing topics.
Topic Metadata Response V1

 

DescribeTopicResponse
TopicMetadataResponse_V1 =>
[TopicName ErrorCode TopicDescription]
ErrorCode => int16
TopicName
[Broker][TopicMetadata]
Broker => NodeId Host Port  (any number of brokers may be returned)
NodeId => int32
Host => string
TopicDescription 
Port =>
TopicConfigDetails [TopicPartitionDetails]
int32
TopicMetadata
TopicConfigDetails
=> TopicErrorCode TopicName [PartitionMetadata] [ConfigEntry]
ConfigEntry
TopicErrorCode =>
string string
int16
TopicPartitionsDetails
PartitionMetadata => PartitionErrorCode PartitionId Leader
[Replica] [ISR]
ReplicasLag Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
Replica
ReplicasLag => [int32 int32]
ISR
Isr => [int32]

 

DescribeTopicResponse TopicMetadataResponse_V1 besides errorCode which is used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription structure per topic. See table below for details:

Field

Description

TopicConfigDetails

A structure that holds basic replication details.

ConfigEntry

Topic-level setting and value which was overridden.

TopicPartitionDetails

List describing replication details for each partition.

PartitionId

Id of the partition.

LeaderBroker-leader id for the described partition (or -1 if not defined).
ReplicasReplicasLagList of broker ids serving a replica's role for the partition .
ISRSame as replicas but includes only brokers that are known to be "in-sync"

In case of error TopicDescription field will be returned in response with default values.

List Topics Request

 

ListTopicsRequest =>
ListTopicsRequest is a request with no arguments.
List Topics Response
and fetch lag for the replica.
ISRSame as replicas but includes only brokers that are known to be "in-sync"

In case of error TopicDescription field will be returned in response with default values.


Replication Commands Schema

 

ListTopicsResponse => ErrorCode [TopicName]
ErrorCode => int16
TopicName => string

ListTopicsResponse besides errorCode holds a list of topics in Kafka cluster.

Replication Commands Schema

Reassign Partitions
Reassign Partitions Request

 

ReassignPartitionRequest => [Topic [PartitionId [ReplicaId]]]
Topic => string
PartitionId => int32
ReplicaId => int32

ReassignPartitionsRequest requires partition assignment - partition to array of replicas mapping. Validation for partition / replicas existence is done on server.

Reassign Partitions Response

 

ReassignPartitionResponse => [Topic ErrorCode]
Topic => string
ErrorCode => int32

ReassignPartitionResponse holds an error code per topic, non-empty if reassignment may not be started (e.g. due to validation error).

Reassignment status can be checked with DescribeTopicRequest field AssignedReplicas.

Preferred Replica Leader Election

Preferred Replica Leader Election Request

 

PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]]
Topic => string
PartitionId => int32
PreferredReplicaLeaderEleactionRequest initiates preferred replica leader election procedure, similar to ReassignPartitionsRequest this request in intended to be non-blocking. The schema consist of one field - array of partitions for which preferred replica leader should be elected.

To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.

Preferred Replica Leader Election Response

 

PreferredReplicaLeaderElectionResponse => [Topic ErrorCode]
Topic => string
ErrorCode => int16

 

PreferredReplicaLeaderElectionResponse is similar to ReassignPartitionsResponse.

Status of the procedure may be checked with DescribeTopicRequest TopicMetadataRequest  - the head of replicas list field and leader broker should be the same.

...