Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Admin client will use Kafka NetworkClient facility from /clients for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).

Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.

Proposed API:

Expand
titleAdminClient API
public class AdminClient {
    
    /**
     * A client is instantiated by providing a set of key-value pairs as configuration. Most
	 * of the settings will be related to NetworkClient
     *
     * @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
     */
    public AdminClient(Properties properties) 
    
    /**
     * Create topictopics with givena numberbuilt of(via partitionsdedicated andbuilder) replicationWire factor, replica assignment will be handled by Kafka clusterProtocol request
     *
     *
 @returns a mapping between *topic @throwsname and java.lang.Exception with descriptive error message or null in case
     * topic was created successfully
     *ApiException
     */
    public void createTopic(String topicName, int partitions, int replicationFactor, List<ConfigEntry> configs) throws ApiException;
    
    / **
 @throws ApiException in case of *global Createerror, topicwhich withmeans specifiedtopic replicacreation assignmentwas (numbernot of partitions and replication factor will be takeneven started
     */
    public *Map<String, fromException> replica assignment string)
  createTopics(CreateTopicRequest createTopicRequest) throws ApiException;
   *
 
    /* @throws ApiException*
     */
 Alter topics with publica voidbuilt createTopic(Stringvia topicName,dedicated Stringbuilder) replicaAssignment,Wire List<ConfigEntry>Protocol configs) throws ApiException;
request
     *
     * @returns a mapping between topic name and java.lang.Exception with descriptive error message or null in case
     * topic was altered successfully
     * 
     * @throws ApiException in case of global error, which means topic alteration was not even started
     */
    public Map<String, Exception> alterTopic(AlterTopicRequest alterTopicRequest) throws ApiException;
    
    /**
     * Delete Kafka topics by name
     * Alter existing
     * @returns a mapping between topic partitionsname and java.lang.Exception with descriptive error message or null in case
     * topic was deleted successfully
/or replica assignment among Kafka brokers
     *
     *
 @throws ApiException
     */
 @throws ApiException in publiccase voidof alterTopic(Stringglobal topicNameerror, Integerwhich partitions,means Stringtopic replicaAssignment,
deletion was not even started
     */
    public void deleteTopic(List<String> topicName) throws ApiException;
    
    /**
     * List all existing topics List<ConfigEntry> addedConfigs, List<String> deletedConfigs) throws ApiException;in Kafka cluster
    /* *
     * @returns Deletelist Kafkaof topic names
by name
     *
     * @throws ApiException
     */
    public voidList<String> deleteTopiclistTopics(String topicName) throws ApiException;
 
    
    /**
     * Check Listwhether alltopic existingwith topicsthe ingiven Kafkaname clusterexists
     *
     * @throws ApiException
     */
    public List<String>boolean listTopicstopicExists(String topicName) throws ApiException;

    
    /**
     * Request replication information about Kafka topictopics
     *
     * @throws ApiException 
     */
    public DescribeTopicOutputMap<String, DescribeTopicOutput> describeTopic(StringList<String> topicNametopicNames) throws ApiException;
    
    /**
     * Initiate long-running reassign partitions procedure
     *
     * @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand)
     * @return future of the reassignment result which is completed once server-side partitions reassignment has succeeded or
     * an error occurred so that partitions reassignment cannot be started
     * @throws ApiException
     */
    public Future<ReassignPartitionsResponse> reassignPartitions(String partitionsReassignment) throws ApiException;

    /**
     * Check the interim status of the partitions reassignment
     *
     * @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand)
     * @return partition to reassignment result code (completed, in-progress, failed)
     * @throws ApiException
     */
    public Map<TopicPartition, Short> verifyReassignPartitions(String partitionsReassignment) throws ApiException;
    
    /**
     * Initiate long-running preferred replica leader election procedure
     *
     * @param partitions serialized partitions for which preferred replica leader election will be started
     *                   (according to PreferredReplicaLeaderElectionCommand)
     * @return future of the election result which is completed once server-side preferred replica is elected for provided partitions or
     * an error has occurred
     * @throws ApiException
     */
    public Future<PreferredReplicaLeaderElectionResponse> preferredReplicaLeaderElection(String partitions) throws ApiException;

    /**
     * Check the interim status of the preferred replica leader election
     *
     * @param partitions for which preferred replica leader election was started (according to PreferredReplicaLeaderElectionCommand)
     * @return partition to reassignment result code (completed, in-progress, failed)
     * @throws ApiException
     */
    public VerifyPreferredReplicaLeaderElectionResponse verifyPreferredReplicaLeaderElection(String partitions)
            throws ApiException;
    
    /**
     * A generic facility to send Admin request and return response counterpart
     *
     * @param adminRequest AdminRequest message
     * @param <T>          concrete AdminRequest type
     * @return response counterpart
     * @throws ApiException
     */
    private <T extends AbstractAdminResponse> T sendAdminRequest(AbstractAdminRequest<T> adminRequest) throws ApiException;

 
    /**
     * Refreshes cluster metadata cache - list of brokers and controller
     * 
     * @throws ApiException
     */
    private void updateClusterMetadata() throws Exception;

}

...