Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Overall the idea is to extend Wire Protocol to cover all existing topic commands - create-topic, alter-topic, delete-topic, describe-topic, list-topics, reassign-partitions. At the same time, since Wire Protocol is a public API to Kafka cluster, it was agreed that the new Admin schema needs to be "orthogonal", i.e. new messages shouldn't duplicate each other or existing messagesrequests, if those already cover particular use cases.

...

 

CreateTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

CreateTopicResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Alter Topic Request

 

AlterTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment [AddedConfigEntry] [DeletedConfig]]
TopicName => string
Replicas => int32
Partitions => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
AddedConfigEntry => ConfigKey ConfigValue
 ConfigKey => string
 ConfigValue => string
 DeletedConfig => string
AlterTopicRequest is a batch asynchronous request to initiate topic alteration: replication parameters, replica assingment assignment or add/remove topic level configuration.
Request semantics:

1. If ReplicaAssignment is defined

    ReplicationFactor and Partitions arguments are ignored in this case.

    For each partition in ReplicaAssignment:

    1.1 If such partition exists and assignment is different from the current replica assignment

        It's a "reassign partition" request - add it reassign-partitions json

    1.2 If such partition doesn't exist

        It's an "add partition" request - change topic metadata in zookeeper to trigger increase partition logic

2. Else if ReplicationFactor is defined

    2.1 If Partitions is defined    

        Regenerate replica assignment for all existing and newly added partitions, goto 1.

    2.2 If Partitions is not defined     

        Regenerate replica assignment only for existing partitions, goto 1.

3. Else if Partitions is defined (ReplicaAssignment and ReplicationFactor are not defined):

    3.1 If Partitions is less than current number of partitions return error code IncreasePartitionsNotAllowed

    3.2 Otherwise, generate automatically replica assignment for newly added partitions, goto 1.

 

Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.

   

Alter Topic Response

 

AlterTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

AlterTopicResponse is similar to CreateTopicResponse.
Delete Topic Request

 

DeleteTopicRequest => [TopicName]
TopicName => string

 

DeleteTopicRequest requires only topic names which should be deleted.
Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Delete Topic Response

 

DeleteTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

DeleteTopicResponse is similar to CreateTopicResponse.

Topic Metadata Request V1

 

TopicMetadataReqeust_V1 => [TopicName]
TopicName => string
TopicMetadataRequest_V1 is an evolved version of TopicMetadataRequest. This request is intended to get actual topic metadata information and verify whether particular admin request (Create, Alter, Delete) has been completed by checking the expected and actual state of the topic.
TopicMetadataRequest_V1 requires only topic names. As with the first version, an empty topic name set results in returning information for all existing topics.
Also TopicMetadataRequest_V1 comparing to the previous version doesn't trigger topic creation automatically if the topic with the given name doesn't exist.
Topic Metadata Response V1

 

TopicMetadataResponse_V1 => [Broker][TopicMetadata]
Broker => NodeId Host Port
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] [ConfigEntry]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
ReplicasLag => [int32 int32]
ConfigEntry => string string

The new version of TopicMetadataResponse in addition to TopicMetadataResponse_V0 will include topic level configuration for each topic. ISR data is removed because only the leading broker manages this metadata accurately.

2. Server-side Admin Request handlers

All incoming request requests will be handled by a specific helper classes class called from KafkaApis - TopicCommandHelper for topic admin commands, ReassignPartitionsCommandHelper and PreferredReplicaLeaderElectionCommandHelper.

All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most Most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands)  most most likely the logic from those classes will be extracted and copied  to to separate classes class (as proposed - TopicCommandHelper etc).

3. Admin Client

This component is intended to be a Kafka out-of-box client implementation for Admin commands.

...