Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Topic commands which include CreateTopic(Request | Response)AlterTopicDeleteTopicDescribeTopicListTopics.
  • Replication tools - ReassingPartition, VerifyReassingPartitions; PreferredReplicaLeaderElection
  • A special type of request to support Admin commands - ClusterMetadata ClusterMetadata [1]

Please find details under specific RQ/RP schema proposal.

...

The same notation as in  A Guide To The Kafka Protocol is used here. The only difference - new Kafka Protocol metatype - MaybeOf ("?" in notation), when used means value is optional in message. To define value existence special control byte is prepended before each value (0 - field is absent, otherwise - read value normally) [12].

 

All admin messages listed below are required to be sent only to Controller broker. Only controller will process such messages. If Admin message is sent to an ordinary broker a special error code is returned (code 22). In case of other failure during processing message AdminRequestFailedError is returned [23].

Error

Code

Description

AdminRequestFailed

21

Unexpected error occurred while processing Admin request.

NotControllerForAdminRequest
22Target broker (id=<this_broker_id>) is not serving a controller's role.

ClusterMetadata

Schema [3]

Schema

Cluster Metadata Request

 

ClusterMetadataRequest =>

 

Cluster Metadata Response

 

ClusterMetadataResponse => ErrorCode [Broker] ?(Controller)
ErrorCode => int16
Broker => NodeId Host Port
NodeId => int32
Host => string
Port => int32
 Controller => Broker

ClusteMetadataRequest is a request with no arguments.

ClusterMetadataResponse holds error code (0 in case of successful result), list of brokers in cluster and optionally broker serving a Controller's role (returning empty Controller most likely means either error during request processing or cluster being in some intermediate state).

ClusterMetadataRequest is required for admin clients to get the Kafka brokers, specifically the controller's location, as only controller may execute admin command [2].

Topic Admin Schema

...

# Start kafka.sh in shell mode
bin/kafka.sh --shell --broker-list <host1 : port1>
Connected to Kafka Controller at <host2 : port2>.
kafka> create-topic --topic my_topic --partitions 5 --replication-factor 3
Topic "my_topic" is created.
kafka> alter-topic --topic my_topic --partitions 10
Topic "my_topic" is changed.
# Switch topic context
kafka> topic my_topic
Switched to "my_topic" topic.
# Execute topic command for switched topic
kafka my_topic> describe-topic
"my-topic" details:
Topic: my_topic Partitions: 10 ...
# Switch off topic context
kafka my_topic> topic
kafka>

Open questions:

  1. ClusterMetadata duplicates TopicMetadata - we can extend TopicMetadata with controllerId information and probably something else. Other alternative - is a generic server-side re-routing facility (see KAFKA-1912 for details).
  2. People expressed concerns about optional MaybeOf type because it is inconsistent with the way we currently handle empty values in the Wire Protocol.
  3. We might extend error codes to fulfill all possible failures and give up using outcome / errorDescription field as a generic result description.ClusterMetadata duplicates TopicMetadata - we can extend TopicMetadata with controllerId information and probably something else. Other alternative - is a generic server-side re-routing facility (see KAFKA-1912 for details).
  4. It is proposed to create a separate ticket to rework topic command to execute commands directly by the controller instead of using zookeeper admin path to notify controller about the change.
  5. AdminClient may need to support batching admin operations. It is considered whether we can cover it with allowing user to supply a regexp for topic name in AlterTopic, DeleteTopic, DescribeTopic requests (similarly to TopicCommand.scala)

...