Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Topic commands which include CreateTopic(Request | Response)AlterTopicDeleteTopicDescribeTopicListTopics.
  • Replication tools - ReassingPartition, VerifyReassingPartitions; PreferredReplicaLeaderElection
  • A special type of request to support Admin commands - enriched TopicMetadataRequest (to add controllerId controller)

Please find details under specific RQ/RP schema proposal.

...

Protocol Errors

It is proposed to add these error codes to the protocol.

Error

Code

Description

Requests
NotControllerReceivedAdminRequest
1001Target broker is not serving a controller's role.For all Admin requests
TopicAlreadyExists1002Topic with this name already exists.CreateTopicRequest
InvalidArgumentPartitions1003Either partition field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest, AlterTopicRequest
DecreasePartitionsNotAllowed1004Invalid partitions argument: decrease decreasing partitions is prohibited.AlterTopicRequest
InvalidArgumentReplicationFactor1005Either replication-factor field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest
InvalidArgumentReplicaAssignment1006Either replication-factor field is invalid (e.g. contains duplicates), or not defined when needed.

CreateTopicRequest, AlterTopicRequest,

ReassignPartitionsRequest, VerifyReassignPartitionsRequest

InvalidTopicConfig1007

Either topic-level config setting or value is incorrect.

CreateTopicRequest, AlterTopicRequest
PreferredReplicaLeaderElectionInProgress1008Preferred replica leader election procedure has been already started.PreferredReplicaLeaderElectionRequest
InvalidArgumentPreferredReplicaElectionData1009Preferred replica leader election data is in invalid (bad json, duplicates etc).PreferredReplicaLeaderElectionRequest
ReassignPartitionsInProgress1010Reassign partitions procedure has been already started.ReassignPartitionsRequest

Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.

E.g. in case of receiving InvalidArgumentPartitions client will be able to define:

a) upon AlterTopicRequest: this happened because user provided incorrect partitions argument (e.g. negative)

b) upon CreateTopicRequest: this happened because user provided replication-factor but not provided partitions argument

ClusterMetadata Schema

Cluster Metadata Request

 

ClusterMetadataRequest =>

 

Cluster Metadata Response

 

ClusterMetadataResponse => ErrorCode [Broker] ?(Controller)
ErrorCode => int16
Broker => NodeId Host Port
NodeId => int32
Host => string
Port => int32
 Controller => Broker

ClusteMetadataRequest is a request with no arguments.

ClusterMetadataResponse holds error code (0 in case of successful result), list of brokers in cluster and optionally broker serving a Controller's role (returning empty Controller most likely means either error during request processing or cluster being in some intermediate state).

ClusterMetadataRequest is required for admin clients to get the Kafka brokers, specifically the controller's location, as only controller may execute admin command.

Topic Admin Schema

The idea is to introduce Wire protocol messages that cover all topic commands (create, alter, delete, list, describe). The motivation behind the proposed schema is the following:

1) Topic commands must inherit options from TopicCommand tool

2) If some of the options are not used in particular command (e.g. ReplicaAssignment in CreateTopicRequest) - the special marker value is used instead (e.g. in case of ReplicaAssignment - empty string)

3) Topic commands must support batching and provide command execution result per-topic

4) Topic commands can be executed only on a broker serving a controller's role - in case request is sent to an ordinary broker - a request-level error should reflect that

Create Topic Request

TopicMetadata_V1 Schema

[1] On start up admin client will be provided with a list of Kafka cluster brokers. It's a client responsibility to define a controller location to send Admin requests to a correct node then.

Currently there is no way to locate controller with existing Wire Protocol messages. It's proposed to extend TopicMetadataRequest to version 1 for these purposes, since it already contains broker list.

Topic Metadata Request

TopicMetadataRequest remains unchanged comparing to version V0.

 

TopicMetadataRequest_V1 => [TopicName]
  TopicName => string

 

Topic Metadata Response
TopicMetadataResponse will be enriched to include controller field.

 

TopicMetadataResponse_V1 => Controller [Broker][TopicMetadata]
Controller => Broker
Broker => NodeId Host Port  (any number of brokers may be returned)
    NodeId => int32
    Host => string
    Port => int32
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
    TopicErrorCode => int16
  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
    PartitionErrorCode => int16
    PartitionId => int32
    Leader => int32
    Replicas => [int32]
    Isr => [int32]

 

Topic Admin Schema

The idea is to introduce Wire protocol messages that cover all topic commands (create, alter, delete, list, describe). The motivation behind the proposed schema is the following:

1) Topic commands must inherit options from TopicCommand tool

2) If some of the options are not used in particular command (e.g. ReplicaAssignment in CreateTopicRequest) - the special marker value is used instead (e.g. in case of ReplicaAssignment - empty string)

3) Topic commands must support batching and provide command execution result per-topic

4) Topic commands can be executed only on a broker serving a controller's role - in case request is sent to an ordinary broker - a request-level error should reflect that

Create Topic Request

 

CreateTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [ConfigEntry]]
TopicName => string
Partitions => int32
Replicas => int32
ReplicaAssignment => string
ConfigEntry => ConfigKey ConfigValue
ConfigKey => string
ConfigValue => string
CreateTopicRequest requires topic name and either (partitions+replicas) or replicas assignment to create topic (validation is done on server side). You can also specify topic-level configs to create topic with (to use default set an empty array).

This table defines values that will be used to denote "no value":

FieldValue
Partitions-1
Replicas-1
ReplicaAssignment"" (empty string)

 

CreateTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [ConfigEntry]]
TopicName => string
Partitions => int32
Replicas => int32
ReplicaAssignment => string
ConfigEntry => ConfigKey ConfigValue
ConfigKey => string
ConfigValue => string

 

Create Topic Response

 

CreateTopicResponse => ErrorCode ?(ErrorDescription)ErrorCode [TopicName ErrorCode]
ErrorCode => int16
ErrorDescription TopicName => string

CreateTopicRequest requires topic name and either (partitions+replicas) or replicas assignment to create topic (validation is done on server side). You can also specify topic-level configs to create topic with (to use default set an empty array).

CreateTopicResponse is fairly simple - you receive it contains "global" error code (0 as always identifies NO_ERROR) and optionally error description. Usually it will hold the higher level exception that happened during command execution.e.g. in case of NotControllerReceivedAdminRequest - see Protocol Errors) and a map between topic and topic creating result. 

Alter Topic Request

 

AlterTopicRequest => [TopicName ?( Partitions ) ?( ReplicaAssignment ) [AddedConfigEntry] [DeletedConfig]]
TopicName => string
Partitions => int32
ReplicaAssignment => string
AddedConfigEntry => ConfigKey ConfigValue
 ConfigKey => string ConfigValue => string
 DeletedConfig => string

 

Alter Topic Response
 ConfigValue => string
 DeletedConfig

 

AlterTopicResponse => ErrorCode ?(ErrorDescription)
ErrorCode => int16
ErrorDescription
=> string
AlterTopicRequest is similar to previous, to specify topic level settings that should be removed, use DeletedConfig array (just setting keys). User can provide new partitions value, replica assignment or both.(just setting keys). User can provide new partitions value, replica assignment or both.

This table defines values that will be used to denote "no value":

FieldValue
Partitions-1
ReplicaAssignment"" (empty string)

 

Alter Topic Response

 

AlterTopicResponse => ErrorCode [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

AlterTopicResponse is similar to CreateTopicResponse.
Delete Topic Request

 

DeleteTopicRequest => [TopicName]
TopicName => string
 DeleteTopicRequest requires only topic names which should be deleted.
Delete Topic Response

 

DeleteTopicResponse => ErrorCode ?(ErrorDescription)[TopicName ErrorCode]
ErrorCode => int16
ErrorDescription
TopicName => string
DeleteTopicRequest requires only topic name which should be deleted.

DeleteTopicResponse is similar to CreateTopicResponse.

Describe Topic Request

 

DescribeTopicRequest => [TopicName]
TopicName => string
 
DescribeTopicRequest requires only topic names.
Describe Topic Response

 

DescribeTopicResponse => ErrorCode ?(ErrorDescription) ?(TopicDescription)ErrorCode [TopicName ErrorCode TopicDescription]
ErrorCode => int16
ErrorDescription TopicName => string
TopicDescription => TopicName TopicConfigDetails [TopicPartitionDetails]
TopicName => string
TopicConfigDetails => Partitions ReplicationFactor [ConfigEntry]
Partitions => int32
ReplicationFactor => int32
ConfigEntry => string string
TopicPartitionsDetails => PartitionId ?(Leader) [Replica] [ISR]
PartitionId => int32
Leader => int32
Replica => int32
ISR => int32
DescribeTopicRequest requires only topic name.

 

DescribeTopicResponse besides errorCode and optional errorDescription which are is used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription structure per topic. See table below for details:

TopicName

Field

Description

The name of the topic for which description is provided.

TopicConfigDetails

A structure that holds basic replication details.

Partitions

Number of partitions in give topic.

Config

Topic-level setting and value which was overridden.

TopicPartitionDetails

List describing replication details for each partition.

PartitionId

Id of the partition.

LeaderOptional broekr-leader id for the described partition.
ReplicasList of broker ids serving a replica's role for the partition.
ISRSame as replicas but includes only brokers that are known to be "in-sync"

 In case of error TopicDescription field will be returned in response with default values.

List Topics Request

 

ListTopicsRequest =>

 

>
ListTopicsRequest is a request with no arguments.
List Topics Response

 

ListTopicsResponse => ErrorCode ?(ErrorDescription) ?(TopicsList)[TopicName]
ErrorCode => int16
ErrorDescription => string
TopicsList => [TopicName]
TopicName => string
ListTopicsRequest is a request with no arguments.

ListTopicsResponse besides errorCode and optional errorDescription which are used in the same way as in previous messages, holds a list of topics in Kafka cluster.

Replication Commands Schema

Reassign Partitions
Reassign Partitions Request

 

ReassignPartitionRequest => ManualAssignment
ManualAssignment => string
 

Schema

Reassign Partitions
Reassign Partitions
 Response
Request

 

ReassignPartitionResponse ReassignPartitionRequest => ErrorCode ?(ErrorDescription)ManualAssignment
ManualAssignment ErrorCode => int16ErrorDescription => string

ReassignPartitionsRequest requires  requires manual partition assignment string. Parsing / validation is done on server. This request will only initiate partition reassignment and return immediately. It is client's responsibility to block the user continually sending sending VerifyReassignPartitionsRequest to  to check reassignment status. The format is the following:

{

"partitions": [

{"topic": "foo",
 "partition": 1,
 "replicas": [1,2,3] }

],
 "version":1
}

Reassign Partitions Response

 

ReassignPartitionResponse => ErrorCode

ReassignPartitionResponse holds just an error codeReassignPartitionResponse is similar to CreateTopicResponse.

Verify Reassign Partitions Request

 

VerifyReassignPartitionRequest => ManualAssignment
ManualAssignment => string
 VerifyReassignPartitionsRequest requires manual partition assignment string as with ReassignPartitionsRequest which status is verified by this request.
Verify Reassign Partitions Response

 

VerifyReassignPartitionResponse => ErrorCode [ReasignmnetResult] ErrorCode ?(ErrorDescription)
ErrorCode => int16
ReasignmnetResult => TopicAndPartition ResultCode
TopicAndPartition => string int32
 ResultCode => int16
 ErrorCode
=> int16
ErrorDescription => string
VerifyReassignPartitionsRequest requires manual partition assignment string as with ReassignPartitionsRequest which status is verified by this request.

VerifyReassignPartitionResponse as with other Admin request may return returns an error code and optional error description in case of failure. Otherwise a reassignment result map is returned. It holds reassignment status (-1 - reassignment failed, 0 - in progress, 1 - completed successfully).

Preferred Replica Leader Election
Preferred Replica Leader Election Request

 

PreferredReplicaLeaderElectionRequest => PartitionsSerialized
PartitionsSerialized => string
 

.


Preferred Replica Leader Election
Preferred Replica Leader Election
 Response
Request

 

PreferredReplicaLeaderElectionResponse PreferredReplicaLeaderElectionRequest => ErrorCode ?(ErrorDescription)PartitionsSerialized
PartitionsSerialized ErrorCode => int16ErrorDescription => string
PreferredReplicaLeaderEleactionRequest initiates  initiates preferred replica leader election procedure, similar to ReassignPartitionsRequest this request in intended to be non-blocking. The schema consist of one optional field - partitions in serialized form (json) for which procedure should be started. The format is the following:

{"partitions":[

{"topic": "foo", "partition": 1},
{"topic": "foobar", "partition": 2}

]
}

To start preferred replica leader election procedure for all partition an empty string should be send in request.

Preferred Replica Leader Election Response

 

PreferredReplicaLeaderElectionResponse => ErrorCode
ErrorCode => int16

 

PreferredReplicaLeaderElectionResponse is similar to CreateTopicResponseReassignPartitionsResponse.

Status of the procedure may be checked with DescribeTopicRequest  - the head of replicas list field and leader broker should be the same.

...