...
Public Interfaces
- Changes to Wire Protocol:
Extending - Extending protocol with these messages:
- CreateTopicCreateTopic(Request | Response), AlterTopic, DeleteTopic, ListTopics- PreferredLeaderReplicaElection- Evolving TopicMetadataRequest to V1
- Evolving TopicMetadaRequest_V0 to the next version
- New client: AdminClient - a Wire Protocol client for administrative operations
- New CLI tool: Kafka Shell - a single unified command line interface for administrative operations
...
Proposed Changes
Proposed changes include 4 parts:
...
It is proposed to add / modify these 3 4 types of requests:
- Topic commands which include
CreateTopic(Request | Response)
,AlterTopic
,DeleteTopic
Replication tools -PreferredReplicaLeaderElection
- Extend
TopicMetadataRequest
to include topic configuration , and partition fetch lag per replicaand remove ISR field
Please find details under specific RQ/RP schema proposal.
...
Command | Wire Protocol Message | Note |
---|---|---|
create-topic | CreateTopicRequest | |
alter-topic | AlterTopicRequest | |
delete-topic | DeleteTopicRequest | |
describe-topic | TopicMetadataRequest_V1 | Using new version on TopicMetadataRequest which will include topic-level config |
list-topics | TopicMetadataRequest | Using an empty list as input for request, which results in returning metadata for all existing topcis |
reassign-partitions | AlterTopicReqeust | Using batch AlterTopicRequest with replica assignment specified |
preferred-replica-leader-election | PreferredReplicaLeaderElectionRequest |
Finally, Topic Admin schema requests are likely to be used not only in CLI tool, where the common use case is create/change/delete a single entity. Since Kafka is able to maintain a huge number of topics it is important that vital user can efficiently issue request many requests commands at one time. That's why all Topic Admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network callsto group commands of one type for many topics in one batch reducing network calls. At the same time to make Schema usage transparent and compliant with existing requests (such as Produce and Fetch) if batch request includes more than one instruction for a specific topic only the last from the list will be executed, others will be silently ignored.
New Protocol Errors
It is proposed to add these error codes to the protocol.
Error | Description | Requests |
---|---|---|
TopicAlreadyExists | Topic with this name already exists. | CreateTopicRequest |
InvalidArgumentTopicName | Topic name contains invalid characters. | CreateTopicRequest |
InvalidArgumentPartitions | Either partition field is invalid (e.g. negative) | CreateTopicRequest , AlterTopicRequest |
InvalidArgumentReplicationFactor | Either replication-factor field is invalid (e.g. negative) | CreateTopicRequest, |
InvalidArgumentReplicaAssignment | Either replication assignment field is invalid (e.g. contains duplicates) |
|
InvalidArgumentTopicConfig | Either topic-level config setting or value is incorrect. | CreateTopicRequest , AlterTopicRequest |
DecreasePartitionsNotAllowed | Invalid partitions argument: decreasing partitions is prohibited when altering topic. | AlterTopicRequest |
PreferredReplicaLeaderElectionInProgress | Preferred replica leader election procedure has been already started. | PreferredReplicaLeaderElectionRequest |
ReassignPartitionsInProgress | Reassign partitions procedure has been already started. | AlterTopicRequest |
MultipleInstructionsForOneTopic | Only one mutation is allowed at once: e.g. change topic replication factor or change topic config. | CreateTopic, AlterTopicRequest |
MultipleTopicInstructionsInOneBatch | Multiple topic instructions for the same topic in one batch request | CreateTopicRequest, AlterTopicRequest, DeleteTopicRequest |
Generally, the Admin Client (see section 3) or another request dispatcher should have enough context to provide descriptive error message.
The same notation as in A Guide To The Kafka Protocol is used here.
...
CreateTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [ConfigEntry]] TopicName => string Partitions => int32 Replicas => int32 ReplicaAssignment => [PartitionId [ReplicaId]] ConfigEntry => ConfigKey ConfigValue ConfigKey => string ConfigValue => ReplicaAssignment => [PartitionId [ReplicaId]] string |
CreateTopicRequest
is a batch asynchronous request to initiate topic creation with either predefined or automatic replica assignment and optionally topic configuration.- Only one from (
Partitions
+Replicas
),ReplicaAssignment
can be defined in one instruction. (Note: there is a special use case - automatic topic creation forTopicMetadataRequest
, to trigger it user should set client_id=consumer and define only topic name).
MultipleInstructionsForOneTopic
is returned- If both parameters are specified -
ReplicaAssignment
takes precedence. - In case
ReplicaAssignment
is defined number of partitions and replicas will be calculated from the
- supplied
ReplicaAssignment
. In case of defined (Partitions
+Replicas
) replica assignment will be automatically generated by the server.
CreateTopicRequest
may include only one topic creation command - Multiple instructions for the same topic
- in one
MultipleTopicInstructionsInOneBatch
is returned- request will be silently ignored, only the last from the list will be executed.
Create Topic Response
CreateTopicResponse => [TopicName ErrorCode] ErrorCode => int16 TopicName => string |
CreateTopicResponse
contains a map between topic and topic creation result error code (see New Protocol Errors).
Alter Topic Request
AlterTopicRequest => [TopicName Partitions Replicas ReplicaAssignment [AddedConfigEntry] [DeletedConfig]] TopicName => string Replicas => int32 Partitions => int32 ReplicaAssignment => [PartitionId [ReplicaId]] AddedConfigEntry => ConfigKey ConfigValue ConfigKey => string ConfigValue => string Deleted Config => string |
AlterTopicRequest
will be used to change topic's replication characteristics, alter topic-level config or reassign partitions is a batch asynchronous request to initiate topic alteration: replication parameters, replication factor or add/remove topic level configuration.User can define only one from Partitions,
Replicas
, ReplicaAssignment
, AddedConfigEntry
, DeletedConfig
. Otherwise, MultipleInstructionsForOneTopic
is returned.
One AlterTopicRequest
may include only one topic alteration command for the topic with the given name in one batch, otherwise MultipleTopicInstructionsInOneBatch
is returned.
Alter Topic Response
[TopicName ErrorCode]ErrorCode => int16 TopicName => string
|
AlterTopicResponse
is similar to CreateTopicResponse
.Delete Topic Request
DeleteTopicRequest => [TopicName] TopicName => string |
DeleteTopicRequest
requires only topic names which should be deleted.
DeleteTopicRequest
may include only one topic deletion command for the topic with the given name in one batch, otherwise MultipleTopicInstructionsInOneBatch
is returned.Delete Topic Response
ErrorCode => int16 TopicName => string
|
DeleteTopicResponse
is similar to CreateTopicResponse
.
Topic Metadata Request V1
TopicMetadataReqeust_V1 => [ TopicName]TopicName => string |
TopicMetadataRequest_V1
is an evolved version of TopicMetadataRequest
. This request is intended to support two admin operations - get topic metadata, and check whether some particular admin command (which are all asynchronous) has been completed.TopicMetadataRequest_V1
requires only topic names. As with the first version, an empty topic name set results in returning information for all existing topics.TopicMetadataRequest_V1
comparing to the previous version won't trigger topic creation automatically if the topic with the given name doesn't exist.Topic Metadata Response V1
TopicMetadataResponse_V1 => [Broker][TopicMetadata] Broker => NodeId Host Port (any number of brokers may be returned) NodeId => int32 Host => string Port => int32 TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] [ConfigEntry] TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader ReplicasLag Isr PartitionErrorCode => int16 PartitionId => int32 Leader => int32 ReplicasLag => [int32 int32] Isr => [int32] ConfigEntry => string string |
The new version of TopicMetadataResponse
in addition to TopicMetadataResponse_V0
will include topic level configuration for each topic and replica fetch lag per partition - how far partition replica is behind from the leader replica.
Replication Commands Schema
Preferred Replica Leader Election Request
PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]] Topic => string PartitionId => int32 |
PreferredReplicaLeaderEleactionRequest
initiates preferred replica leader election procedure. Similar to Topic Admin requests this request in intended to be non-blocking. The schema consist of one field - array of partitions for which preferred replica leader should be elected.To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.
Preferred Replica Leader Election Response
PreferredReplicaLeaderElectionResponse => [Topic ErrorCode] Topic => string ErrorCode => int16 |
PreferredReplicaLeaderElectionResponse
is similar to ReassignPartitionsResponse
.
Status of the procedure may be checked with TopicMetadataRequest
- the head of replicas
list field and leader
broker should be the same.
...