Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Wire Protocol extensions - to add new Admin messages
  2. Server-side Admin commands handlers (TopicCommand-like)
  3. Admin Client - an out-of-box client for performing administrative commands
  4. Interactive Shell / CLI tool supporting administrative commands

 

Some open questions and items under discussion are marked with {color:red}*{color}[x]. Please find Open Questions section for more details.

 

1. Wire Protocol Extensions

...

Please find details under specific RQ/RP schema proposal.

Schema

The same notation as in  A Guide To The Kafka Protocol is used here. The only difference - new Kafka Protocol metatype - MaybeOf ("?" in notation), when used means value is optional in message. To define value existence special control byte is prepended before each value (0 - field is absent, otherwise - read value normally) [1].

 

All admin messages listed below are required to be sent only to Controller broker. Only controller will process such messages. If Admin message is sent to an ordinary broker a special error code is returned (code 22). In case of other failure during processing message AdminRequestFailedError is returned.

Error

Code

Description

AdminRequestFailed

21

Unexpected error occurred while processing Admin request.

NotControllerForAdminRequest
22Target broker (id=<this_broker_id>) is not serving a controller's role.

ClusterMetadata Schema

Cluster Metadata Request

 

ClusterMetadataRequest =>

 

Cluster Metadata Response

 

ClusterMEtadataResponse
ClusterMetadataResponse => ErrorCode [Broker] ?(Controller)
  
ErrorCode => int16
  
Broker => NodeId Host Port
    
NodeId => int32
    
Host => string
    
Port => int32
 Controller => Broker

ClusteMetadataRequest is a request with no arguments.

ClusterMetadataResponse holds error code (0 in case of successful result), list of brokers in cluster and optionally broker serving a Controller's role (returning empty Controller most likely means either error during request processing or cluster being in some intermediate state).

Admin RQ/RP format

All admin messages listed below are required to be sent only to Controller broker. Only controller will process such messages. If Admin message is sent to an ordinary broker a special error code is returned (code 22). In case of other failure during processing message AdminRequestFailedError is returned.

Error

Code

Description

AdminRequestFailed

21

Unexpected error occurred while processing Admin request.

InvalidRequestTarget
22Target broker (id=<this_broker_id>) is not serving a controller's role.

 

ClusterMetadataRequest is required for admin clients to get the Kafka brokers, specifically the controller's location, as only controller may execute admin command [2].

Topic Admin Schema

Create Topic Request

 

CreateTopicRequest => TopicName ?(Partitions) ?(Replicas) ?(ReplicaAssignment) [ConfigConfigEntry]
  
TopicName => string
  
Partitions => int32
  
Replicas => int32
  
ReplicaAssignment => string
  
ConfigEntry => ConfigKey ConfigValue
ConfigKey => string
ConfigValue
Config
=> string
 

 

Create Topic Response

 

CreateTopicResponse => ErrorCode ?(ErrorDescription)
  
ErrorCode => int16
  ErrorDescription
ErrorDescription => string

CreateTopicRequest requires topic name and either (partitions+replicas) or replicas assignment to create topic (validation is done on server side). You can also specify topic-level configs to create topic with (to use default set an empty array), format key=value.

CreateTopicResponse is fairly simple - you receive error code (0 as always identifies NO_ERROR) and optionally error description. Usually it will hold the higher level exception that happened during command execution.

Alter Topic Request

 

AlterTopicRequest => TopicName ?(Partitions) ?(ReplicaAssignment) [AddedConfigAddedConfigEntry] [DeletedConfig]
  
TopicName => string
  
Partitions => int32
  Replicas
ReplicaAssignment => string
AddedConfigEntry => ConfigKey ConfigValue
 ConfigKey =>
int32
string
  AddedConfig
 ConfigValue => string
  Deleted
 DeletedConfig => string

 

Alter Topic Response

 

AlterTopicResponse => ErrorCode ?(ErrorDescription)
  
ErrorCode => int16
  ErrorDescription
ErrorDescription => string

AlterTopicRequest is similar to previous, to specify topic level settings that should be removed, use DeletedConfig array (just setting keys). User can provide new partitions value, replica assignment or both.

AlterTopicResponse is similar to CreateTopicResponse.

Delete Topic Request

 

DeleteTopicRequest => TopicName
  
TopicName => string

 

Delete Topic Response

 

DeleteTopicResponse => ErrorCode ?(ErrorDescription)
  
ErrorCode => int16
  ErrorDescription
ErrorDescription => string

DeleteTopicRequest requires only topic name which should be deleted.

DeleteTopicResponse is similar to CreateTopicResponse.

Describe Topic Request

 

DescribeTopicRequest => TopicName
  
TopicName => string

 

Describe Topic Response

 

DescribeTopicResponse => ErrorCode ?(ErrorDescription) ?(TopicDescription)
  
ErrorCode => int16
  ErrorDescription
ErrorDescription => string
  TopicDescription 
TopicDescription => TopicName TopicConfigDetails [TopicPartitionDetails]
    TopicName
TopicName => string
    TopicConfigDetails
TopicConfigDetails => Partitions ReplicationFactor [
Config
ConfigEntry]
      Partitions
Partitions => int32
      ReplicationFactor
ReplicationFactor => int32
      Config
ConfigEntry =>
overridden topic-level configs
string string
TopicPartitionsDetails
    TopicPartitionsDetails
=> PartitionId ?(Leader) [Replica] [ISR]
      PartitionId
PartitionId => int32
      Leader
Leader => int32
      Replica
 Replica => int32
      ISR
 ISR => int32
DescribeTopicRequest requires only topic name.

DescribeTopicResponse besides errorCode and optional errorDescription which are used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription structure. Its structure is the followingSee table below for details:

Field

Description

TopicName

The name of the topic for which description is provided.

TopicConfigDetails

A structure that holds basic replication details.

Partitions

Number of partitions in give topic.

Config

Topic-level setting and value which was overridden.

TopicPartitionDetails

List describing replication details for each partition.

PartitionId

Id of the partition.

LeaderOptional broekr-leader id for the described partition.
ReplicasList of broker ids serving a replica's role for the partition.
ISRSame as replicas but includes only brokers that are known to be "in-sync"

 

List Topics Request

 

ListTopicsRequest =>

 

List Topics Response

 

ListTopicsResponse => ErrorCode ?(ErrorDescription) ?(TopicsList)
  
ErrorCode => int16
  ErrorDescription
ErrorDescription => string
  TopicsList
TopicsList => [
TopicMarkedForDeletion] [AliveTopic
TopicName]
    TopicMarkedForDeletion => string
    AliveTopic
TopicName => string
ListTopicsRequest is a request with no arguments.

ListTopicsResponse besides errorCode and optional errorDescription which are used in the same way as in previous messages, holds optional (non empty if execution was successful) two list of topic names - one for deleted topics (marked for deletion) and the second one for ordinary, alive topics.

...