Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The same notation as in  A Guide To The Kafka Protocol is used here. The only difference - new Kafka Protocol metatype - MaybeOf ("?" in notation), when used means value is optional in message. To define value existence special control byte is prepended before each value (0 - field is absent, otherwise - read value normally) [2].

 

All admin messages listed below are required to be sent only to Controller broker. Only controller will process such messages. If Admin message is sent to an ordinary broker a special error code is returned (code 22). In case of other failure during processing message AdminRequestFailedError is returned [32].

Error

Code

Description

AdminRequestFailed

21

Unexpected error occurred while processing Admin request.

NotControllerForAdminRequest
22Target broker (id=<this_broker_id>) is not serving a controller's role.

ClusterMetadata Schema

Cluster Metadata Request

 

ClusterMetadataRequest =>

 

Cluster Metadata Response

 

ClusterMetadataResponse => ErrorCode [Broker] ?(Controller)
ErrorCode => int16
Broker => NodeId Host Port
NodeId => int32
Host => string
Port => int32
 Controller => Broker

ClusteMetadataRequest is a request with no arguments.

ClusterMetadataResponse holds error code (0 in case of successful result), list of brokers in cluster and optionally broker serving a Controller's role (returning empty Controller most likely means either error during request processing or cluster being in some intermediate state).

ClusterMetadataRequest is required for admin clients to get the Kafka brokers, specifically the controller's location, as only controller may execute admin command.

Topic Admin Schema

...

All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands)  most likely the logic from those classes will be extracted and copied  to separate classes (as proposed - TopicCommandHelper[43] etc).

3. Admin Client

This component is intended to be a Kafka out-of-box client implementation for Admin commands.

Admin client will use Kafka NetworkClient facility from /clients for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).

Proposed API [54]:

Expand
titleAdminClient API
public class AdminClient {
    
    /**
     * A client is instantiated by providing a set of key-value pairs as configuration. Most
	 * of the settings will be related to NetworkClient
     *
     * @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
     */
    public AdminClient(Properties properties) 
    
    /**
     * Create topic with given number of partitions and replication factor, replica assignment will be handled by Kafka cluster
     *
     * @throws ApiException
     */
    public void createTopic(String topicName, int partitions, int replicationFactor, List<ConfigEntry> configs) throws ApiException;
    
    /**
     * Create topic with specified replica assignment (number of partitions and replication factor will be taken
     * from replica assignment string)
     *
     * @throws ApiException
     */
    public void createTopic(String topicName, String replicaAssignment, List<ConfigEntry> configs) throws ApiException;
 
    /**
     * Alter existing topic partitions and/or replica assignment among Kafka brokers
     *
     * @throws ApiException
     */
    public void alterTopic(String topicName, Integer partitions, String replicaAssignment,
                                    List<ConfigEntry> addedConfigs, List<String> deletedConfigs) throws ApiException;
    /**
     * Delete Kafka topic by name
     *
     * @throws ApiException
     */
    public void deleteTopic(String topicName) throws ApiException;
    
    /**
     * List all existing topics in Kafka cluster
     *
     * @throws ApiException
     */
    public List<String> listTopics() throws ApiException;
    
    /**
     * Request replication information about Kafka topic
     *
     * @throws ApiException
     */
    public DescribeTopicOutput describeTopic(String topicName) throws ApiException;
    
    /**
     * Initiate long-running reassign partitions procedure
     *
     * @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand)
     * @return future of the reassignment result which is completed once server-side partitions reassignment has succeeded or
     * an error occurred so that partitions reassignment cannot be started
     * @throws ApiException
     */
    public Future<ReassignPartitionsResponse> reassignPartitions(String partitionsReassignment) throws ApiException;

    /**
     * Check the interim status of the partitions reassignment
     *
     * @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand)
     * @return partition to reassignment result code (completed, in-progress, failed)
     * @throws ApiException
     */
    public Map<TopicPartition, Short> verifyReassignPartitions(String partitionsReassignment) throws ApiException;
    
    /**
     * Initiate long-running preferred replica leader election procedure
     *
     * @param partitions serialized partitions for which preferred replica leader election will be started
     *                   (according to PreferredReplicaLeaderElectionCommand)
     * @return future of the election result which is completed once server-side preferred replica is elected for provided partitions or
     * an error has occurred
     * @throws ApiException
     */
    public Future<PreferredReplicaLeaderElectionResponse> preferredReplicaLeaderElection(String partitions) throws ApiException;

    /**
     * Check the interim status of the preferred replica leader election
     *
     * @param partitions for which preferred replica leader election was started (according to PreferredReplicaLeaderElectionCommand)
     * @return partition to reassignment result code (completed, in-progress, failed)
     * @throws ApiException
     */
    public VerifyPreferredReplicaLeaderElectionResponse verifyPreferredReplicaLeaderElection(String partitions)
            throws ApiException;
    
    /**
     * A generic facility to send Admin request and return response counterpart
     *
     * @param adminRequest AdminRequest message
     * @param <T>          concrete AdminRequest type
     * @return response counterpart
     * @throws ApiException
     */
    private <T extends AbstractAdminResponse> T sendAdminRequest(AbstractAdminRequest<T> adminRequest) throws ApiException;

 
    /**
     * Refreshes cluster metadata cache - list of brokers and controller
     * 
     * @throws ApiException
     */
    private void updateClusterMetadata() throws Exception;

}

...

Open questions:

  1. ClusterMetadata duplicates TopicMetadata - we can extend TopicMetadata with controllerId information and probably something else. Other alternative - is a generic server-side re-routing facility (see KAFKA-1912 for details).People expressed concerns about optional MaybeOf type because it is inconsistent with the way we currently handle empty values in the Wire Protocol.
  2. We might extend error codes to fulfill all possible failures and give up using outcome / errorDescription field as a generic result description.
  3. It is proposed to create a separate ticket to rework topic command to execute commands directly by the controller instead of using zookeeper admin path to notify controller about the change.
  4. AdminClient may need to support batching admin operations. It is considered whether we can cover it with allowing user to supply a regexp for topic name in AlterTopic, DeleteTopic, DescribeTopic requests (similarly to TopicCommand.scala)

...