Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Wire Protocol extensions - to add new Admin messages
  2. Server-side Admin commands handlers (TopicCommand-like)
  3. Admin Client - an out-of-box client for performing administrative commands
  4. Interactive Shell / CLI tool supporting administrative commands

Some open questions and items under discussion are marked with [x]. Please find Open Questions section for more details.

(You can try out some of the changes from this KIP. Please follow this link to get the code and start the CLI tool).

...

  • Topic commands which include CreateTopic(Request | Response)AlterTopicDeleteTopicDescribeTopicListTopics.
  • Replication tools - ReassingPartition, VerifyReassingPartitions; PreferredReplicaLeaderElection
  • A special type of request to support Admin commands - enriched TopicMetadataRequest (to add controller)

Please find details under specific RQ/RP schema proposal.

...

New Protocol Errors

It is proposed to add these error codes to the protocol.

Error

Code

Description

Requests
TopicAlreadyExists1001Topic with this name already exists.CreateTopicRequest
InvalidArgumentPartitions1002Either partition field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest, AlterTopicRequest
DecreasePartitionsNotAllowed1003Invalid partitions argument: decreasing partitions is prohibited.AlterTopicRequest
InvalidArgumentReplicationFactor1004Either replication-factor field is invalid (e.g. negative), or not defined when needed.CreateTopicRequest
InvalidArgumentReplicaAssignment1005Either replication-factor field is invalid (e.g. contains duplicates), or not defined when needed.

CreateTopicRequest, AlterTopicRequest,

ReassignPartitionsRequest, VerifyReassignPartitionsRequest

InvalidTopicConfig1006

Either topic-level config setting or value is incorrect.

CreateTopicRequest, AlterTopicRequest
PreferredReplicaLeaderElectionInProgress1007Preferred replica leader election procedure has been already started.PreferredReplicaLeaderElectionRequest
InvalidArgumentPreferredReplicaElectionData1008Preferred replica leader election data is in invalid (bad json, duplicates etc).PreferredReplicaLeaderElectionRequest
ReassignPartitionsInProgress1009Reassign partitions procedure has been already started.ReassignPartitionsRequest

Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.

TopicMetadata_V1 Schema

[1] On start up admin client will be provided with a list of Kafka cluster brokers. It's a client responsibility to define a controller location to send Admin requests to a correct node then.

Currently there is no way to locate controller with existing Wire Protocol messages. It's proposed to extend TopicMetadataRequest to version 1 for these purposes, since it already contains broker list.

Topic Metadata Request

TopicMetadataRequest remains unchanged comparing to version V0.

 

TopicMetadataRequest_V1 => [TopicName]
  TopicName => string

 

Topic Metadata Response
TopicMetadataResponse will be enriched to include controller field.

 

TopicMetadataResponse_V1 => Controller [Broker][TopicMetadata]
Controller => Broker
Broker => NodeId Host Port  (any number of brokers may be returned)
    NodeId => int32
    Host => string
    Port => int32
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
    TopicErrorCode => int16
  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
    PartitionErrorCode => int16
    PartitionId => int32
    Leader => int32
    Replicas => [int32]
    Isr => [int32]

 

Topic Admin Schema
Topic Admin Schema

The idea is to introduce Wire protocol messages that cover all topic commands

The idea is to introduce Wire protocol messages that cover all topic commands (create, alter, delete, list, describe). The motivation behind the proposed schema is the following:

1) Topic commands must inherit options from TopicCommand tool

2) If some of the options are not used in particular command (e.g. ReplicaAssignment in CreateTopicRequest) - the special marker value is used instead (e.g. in case of ReplicaAssignment - empty string)

3) Topic commands must support batching and provide command execution result per-topic

4) Topic commands are asynchronous - the request to create/alter/delete just initiates the corresponding commands and returns immediately

5) Topic commands can be executed only on a broker serving a controller's role - in case request is sent to an ordinary broker - a request-level error should reflect that

...

 

CreateTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [ConfigEntry]]
TopicName => string
Partitions => int32
Replicas => int32
ReplicaAssignment => [Topic [PartitionId [ReplicaId]]]
ConfigEntry => ConfigKey ConfigValue
ConfigKey => string
ConfigValue => string
CreateTopicRequest requires topic name and either (Partitions+Replicas) or ReplicasAssignment to create a topic. A special value -1 should be used to denote an empty value for Partitions and ReplicasAlso user will be able to specify topic-level configs to create topic with (to use default an empty array should be provided).

The (Partitions, Replicas)/ReplicaAssignment semantics is the following:

1) If ReplicaAssignment is specified - Partitions and Replicas are not taken into account, topic is created with provided replica assignment and number of topics and replication factor are defined from ReplicaAssignment
2) If ReplicaAssignment is empty - number of topic partitions and replication factor must be defined with Partitions and Replicas, the replica assignment for topic is automatically generated on server

...

 

CreateTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

CreateTopicResponse contains a map between topic and topic creating creation result error code (see New Protocol Errors). 

Alter Topic Request

 

AlterTopicRequest => [TopicName Partitions ReplicaAssignment [AddedConfigEntry] [DeletedConfig]]
TopicName => string
Partitions => int32
ReplicaAssignment => [Topic [PartitionId [ReplicaId]]]
AddedConfigEntry => ConfigKey ConfigValue
 ConfigKey => string
 ConfigValue => string
 DeletedConfig => string
AlterTopicRequest is similar to previous, to specify topic level settings that should be removed, use DeletedConfig array (just setting keys). User can provide new partitions value, replica assignment or both.

AlterTopicRequest contains an optional field Partitions. A special value -1 should be used to denote an empty value. The Partitions/ReplicaAssignment semantics is the following:

1) Partitions is used only to increase number of topic partitions

2) If Partitions value is empty (-1) ReplicaAssignment is not taken into account, topic partitions are not increased

3) If Partitions doesn't increase existing number of partitions an error code DecreasePartitionsNotAllowed is returned

3) If Partitions value is not empty and increases number of existing partitions, a new replica assignment for topic partitions is either automatically generated or defined by ReplicaAssignment (if nonempty)

Alter Topic Response

 

AlterTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

AlterTopicResponse is similar to CreateTopicResponse.
Delete Topic Request

 

DeleteTopicRequest => [TopicName]
TopicName => string
DeleteTopicRequest requires only topic names which should be deleted.
Delete Topic Response

 

DeleteTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

DeleteTopicResponse is similar to CreateTopicResponse.

Describe Topic Request

 

DescribeTopicRequest => [TopicName]
TopicName => string
DescribeTopicRequest requires only topic names.
Describe Topic Response

 

DescribeTopicResponse => [TopicName ErrorCode TopicDescription]
ErrorCode => int16
TopicName => string
TopicDescription => TopicConfigDetails [TopicPartitionDetails]
TopicConfigDetails => [ConfigEntry]
ConfigEntry => string string
TopicPartitionsDetails => PartitionId Leader [Replica] [ISR]
PartitionId => int32
Leader => int32
Replica => int32
ISR => int32

 

DescribeTopicResponse besides errorCode which is used in the same way as in previous messages, holds optional (non empty if execution was successful) TopicDescription structure per topic. See table below for details:

Field

Description

TopicConfigDetails

A structure that holds basic replication details.

Config

Topic-level setting and value which was overridden.

TopicPartitionDetails

List describing replication details for each partition.

PartitionId

Id of the partition.

LeaderBroker-leader id for the described partition (or -1 if not defined).
ReplicasList of broker ids serving a replica's role for the partition.
ISRSame as replicas but includes only brokers that are known to be "in-sync"

In case of error TopicDescription field will be returned in response with default values.

List Topics Request

 

ListTopicsRequest =>
ListTopicsRequest is a request with no arguments.
List Topics Response

 

ListTopicsResponse => ErrorCode [TopicName]
ErrorCode => int16
TopicName => string

ListTopicsResponse besides errorCode holds a list of topics in Kafka cluster.

Replication Commands Schema

Reassign Partitions
Reassign Partitions Request

 

ReassignPartitionRequest => [Topic [PartitionId [ReplicaId]]]
Topic => string
PartitionId => int32
ReplicaId => int32

ReassignPartitionsRequest requires partition assignment - partition to array of replicas mapping. Validation for partition / replicas existence is done on server. Status of the ReassignPartitionRequest can be checked with VerifyReassignParitionRequest.

Reassign Partitions Response

 

ReassignPartitionResponse => [Topic ErrorCode]
Topic => string
ErrorCode => int32

ReassignPartitionResponse holds just an error code per topic, non-empty if reassignment may not be started (e.g. due to validation error).

Verify Reassign Partitions Request

 

VerifyReassignPartitionRequest => [Topic [PartitionId [ReplicaId]]]
Topic => string
PartitionId => int32
ReplicaId => int32
VerifyReassignPartitionsRequest requires partition assignment - partition to array of replicas mapping - the same as for ReassignPartitionsRequest, to check whether reassignment is completed and new assignment matches requested.
Verify Reassign Partitions Response

 

VerifyReassignPartitionResponse => [ReasignmnetResult]
ReasignmnetResult => Topic [PartitionId ResultCode]
Topic => string
PartitionId => int32
ResultCode => int16

VerifyReassignPartitionResponse returns a reassignment result map. It holds reassignment status (-1 - reassignment failed, 0 - in progress, 1 - completed successfully) per topic and partition.


Preferred Replica Leader Election
Preferred Replica Leader Election Request

 

PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]]
Topic => string
PartitionId => int32
PreferredReplicaLeaderEleactionRequest initiates preferred replica leader election procedure, similar to ReassignPartitionsRequest this request in intended to be non-blocking. The schema consist of one optional field - array of partitions for which preferred replica leader should be elected.

To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.

Preferred Replica Leader Election Response

 

PreferredReplicaLeaderElectionResponse => [Topic ErrorCode]
Topic => string
ErrorCode => int16

 

PreferredReplicaLeaderElectionResponse is similar to ReassignPartitionsResponse.

Status of the procedure may be checked with DescribeTopicRequest  - the head of replicas list field and leader broker should be the same.

...

# Start kafka.sh in shell mode
bin/kafka.sh --shell --broker-list <host1 : port1>
Connected to Kafka Controller at <host2 : port2>.

kafka> create-topic --topic my_topic --partitions 5 --replication-factor 3
Topic "my_topic" is created.

kafka> alter-topic --topic my_topic --partitions 10
Topic "my_topic" is changed.
# Switch topic context
kafka> topic my_topic
Switched to "my_topic" topic.
# Execute topic command for switched topic
kafka my_topic> describe-topic
"my-topic" details:
Topic: my_topic Partitions: 10 ...
# Switch off topic context
kafka my_topic> topic
kafka>

Open questions:

...

-partitions 10
Topic "my_topic" is changed.
# Switch topic context
kafka> topic my_topic
Switched to "my_topic" topic.

# Execute topic command for switched topic
kafka my_topic> describe-topic
"my-topic" details:
Topic: my_topic Partitions: 10 ...

# Switch off topic context
kafka my_topic> topic
kafka>

...


Compatibility, Deprecation, and Migration Plan

The current tools will still be available in the 0.8.3 release but will be removed for 0.9 (tracked here https://issues.apache.org/jira/browse/KAFKA-1776 ).

...