Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Contents

Table of Contents

Status

Current state: under discussion Accepted

Discussion thread: here

JIRA: KAKFA-3265

Released: 0.11.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Systems that interface with Kafka, such as management systems and proxies, often need to perform administrative actions.  For example, they might need to be able to create or delete topics.  Currently, they can't do this without relying on internal Kafka classes, or shell scripts distributed with Kafka.  We would like to add a public, stable AdminClient API that exposes this functionality to JVM-based clients in a well-supported way.

...

The AdminClient will be distributed as part of kafka-clients.jar.  It will provide a Java API for managing Kafka.

The AdminClient interface will be in the org.apache.kafka.clients.admin namespace.  The implementation will be in the KafkaAdminClient class, in the same namespace.  The separation between interface and implementation is intended to make the difference between public API and private implementation clearer, and make developing mocks in unit tests easier.  This is similar to the divide between Producer and KafkaProducer, and Consumer and KafkaConsumer.

Users will configure the AdminClient the same way they configure the Producer and Consumer: by supplying a map of keys to values to its constructor.  As much as possible, we should reuse the same configuration key names, such as bootstrap.servers, client.id, etc.  We should also offer the ability to configure timeouts, buffer sizes, and other networking settings.

The AdminClient will provide future CompletableFuture-based APIs that closely reflect the requests which the brokers can handle.  The client will be multi-threaded; multiple threads will be able to safely make calls using the same AdminClient object.  When a future fails, its get() method will throw an ExceutionException which wraps the underlying exception.

...

In general, we want to avoid using internal Kafka classes in the AdminClient interface.  For example, most RPC classes should be considered internal, such as MetadataRequest or MetadaResponse.  We should be able to change those classes in the future without worrying about breaking users of AdminClient.  Inner classes such as MetadataResponse#TopicMetadata should also be considered internal, and not exposed in the API of AdminClient.

Implementation

As mentioned earlier, the AdminClient will use the KIP-4 wire protocol.  This mainly means using NetworkClient and related RPC classes for the implementation.

...

Code Block
languagejava
public interface AdminClient extends AutoCloseable {
    classstatic Factory {
    AdminClient create(Map<String, Object> conf);

    /**
          * CreateClose athe newAdminClient AdminClientand withrelease theall givenassociated configurationresources.
     */
    void *close();

    /**
      * @paramCreate confa batch of new topics with the default   The configurationoptions.
     *
     * @return@param newTopics         The new topics to The new AdminClientcreate.
     * @return    */
        AdminClient create(Map<String, Object> conf);
    }

    /**
     * Close the AdminClient and release all associated resourcesThe CreateTopicsResults.
     */
    voidCreateTopicResults closecreateTopics(Collection<NewTopic> newTopics);

    /**
     * Create a batch of new topics.
     *
     * @param newTopics         The new topics to create.
     * @param options           The options to use when creating the new topics, or null to use defaults.
     * @return                  The CreateTopicsResults.
     */
    CreateTopicResults createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options);

    /**
     * CreateSimilar ato single new topic.#{@link AdminClient#deleteTopics(Collection<String>, DeleteTopicsOptions),
     *
 but uses the default *options.
 @param newTopic          The new topic to create. *
     * @param topics options           The options to use when creating the new topic, ornames null to use defaultsdelete.
     * @return                  The CreateTopicsResultsDeleteTopicsResults.
     */
    CreateTopicResultsDeleteTopicResults createTopicdeleteTopics(NewTopic newTopic, CreateTopicsOptions optionsCollection<String> topics);

    /**
     * Delete a batch of topics.
     *
     * @param topics    It may take several seconds after AdminClient#deleteTopics returns
     * success for Theall topicthe namesbrokers to delete. become aware that the topics are gone. 
     * @paramDuring optionsthis time, AdminClient#listTopics and AdminClient#describeTopic
     * may Thecontinue options to usereturn wheninformation deletingabout the deleted topics.
     *
     * If delete.topic.enable is false on the brokers, deleteTopics will mark
     * the topics for deletion, or null to use defaults. but not actually delete them.  The futures will
     * return successfully in this case.
     *
     * @param topics            The topic names to delete.
     * @param options           The options to use when deleting the topics.
     * @return                  The DeleteTopicsResults.
     */
    DeleteTopicResults deleteTopics(Collection<String> topics, DeleteTopicsOptions options);

    /**
     * List the topics available in the cluster with the default options.
     * @return                  The ListTopicsResults.
     */
    ListTopicsResults listTopics();

    /**
     * List the topics available in the cluster.
     *
     * @param options           The options to use when listing the topics.
     * @return                  The ListTopicsResults.
     */
    ListTopicsResults listTopics(ListTopicsOptions options);

    /**
     * @returnSimilar to {@link AdminClient#describeTopic(String, DescribeTopicOptions)},
     * but uses the default options.
     The DeleteTopicsResults.*
     */
 @param topicName    DeleteTopicResults deleteTopics(Collection<String> topics, DeleteTopicsOptions options);

 The topic to /**describe.
     * @return  Delete  a topic.
     *
     * @param topic The DescribeTopicResults.
     */
    DescribeTopicResults  The topic name to delete.describeTopic(String topicName);

    /**
     * @paramDescripe an optionsindividual topic in the cluster.
     *
  The options to use* whenNote deletingthat theif topics, or null to use defaults.auto.create.topics.enable is true on the brokers,
     * @return     AdminClient#describeTopic(topicName) may create a topic named topicName.
     * There are two workarounds: either use AdminClient#listTopics Theand DeleteTopicsResults.ensure
     */
    DeleteTopicResults deleteTopic(Stringthat the topic, DeleteTopicsOptions options);

    /**is present before describing, or disable
     * List the topics available in the clusterauto.create.topics.enable.
     *
      * @param options  topicName         The optionstopic to use when listing the topics, or null to use defaultsdescribe.
     * @return@param options           The options to use when describing Thethe ListTopicsResultstopic.
     */
 @return   ListTopicsResults listTopics(ListTopicsOptions options);

    /**
     * Descripe an individual topic in theThe clusterDescribeTopicResults.
     */
     * @param optionsDescribeTopicResults describeTopic(String topicName, DescribeTopicOptions options);

    /**
     * Describe Thethe optionscluster toinformation, useusing whenthe describingdefault theoptions.
 topic, or null to use defaults.*
     * @return                  The DescribeTopicResultsListNodesResults.
     */
    DescribeTopicResultsDescribeClusterResults describeTopic(String topicName, DescribeTopicOptions optionsdescribeCluster();

    /**
     * ListDescribe the nodes in the clustercluster information.
     *
     * @param options           The options to use when listingdescribing the nodes, or null to use defaultscluster.
     * @return                  The ListNodesResults.
     */
    ListNodesResultsDescribeClusterResults listNodesdescribeCluster(ListNodesOptionsDescribeClusterOptions options);

    /**
     * Get information about the api versions of nodes in the cluster.
     *
     * @param nodes             The nodes to get information about, or null to get information about all nodes nodes in the cluster with the default options.
     *
 @param options   * @param nodes      The options to use when getting api versions of theThe nodes, orto nullget toinformation use defaultsabout.
     * @return                  The ApiVersionsResults.
     */
    ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options);

    /**
     * Get information about the api versions of a nodenodes in the cluster.
     *
     * @param nodes             The nodenodes to get information about.
     * @param options           The options to use when getting api versions of the node, or null to use defaultsnodes.
     * @return                  The ApiVersionsResults.
     */
    ApiVersionsResults apiVersionapiVersions(NodeCollection<Node> nodenodes, ApiVersionsOptions options);
}

/**
 * The base class for a request to create a new topic.
 */
abstract class NewTopic {
    public NewTopic(String name, int numPartitions, short replicationFactor);
    public NewTopic(String name, Map<Integer, List<Integer>> replicasAssignments);

    public String name();

    /**
      * Set the configuration to use on the new topic.
      *
      * @param configs               The configuration map.
      * @return                      This NewTopic object.
      */
    public NewTopic setConfigs(Map<String, String> configs);
}

 /**
  * Options for the newTopics call.
  */
class CreateTopicsOptions {
    private intInteger clientTimeoutMstimeoutMs = -1;
    private int serverTimeoutMs = -1null;
    private boolean validateOnly = false;
    public CreateTopicsOptions setClientTimeoutMs(int clientTimeoutMs);
    public int clientTimeoutMs();
    public CreateTopicsOptions setServerTimeoutMs(int serverTimeoutMs) false;
    public CreateTopicsOptions setTimeoutMs(int serverTimeoutMs(timeoutMs);
    public CreateTopicsOptions setValidateOnly(boolean validateOnly);
    public boolean validateOnly();
}

/**
  * The result of the createTopics call.
  */
class CreateTopicResults {
    /**
      * Return a map from topic names to futures, which can be used to check the status of individual
      * topic creations.
      */
    public Map<String, CompletableFuture<Void>>KafkaFuture<Void>> results();
 
    /**
      * Return a future which succeeds if all the topic creations succeed.
      */
    public CompletableFuture<Void>KafkaFuture<Void> all();
}

/**
  * Options for the deleteTopics call.
  */
class DeleteTopicsOptions {
    private intInteger clientTimeoutMstimeoutMs = -1;
    private int serverTimeoutMs = -1null;
    public DeleteTopicsOptions setClientTimeoutMssetTimeoutMs(int clientTimeoutMs);

    public int clientTimeoutMs();

    public DeleteTopicsOptions setServerTimeoutMs(int serverTimeoutMstimeoutMs);
    public int serverTimeoutMstimeoutMs();
}
 
/**
  * The result of the deleteTopics call.
  */
class DeleteTopicResults {
    /**
      * Return a map from topic names to futures which can be used to check the status of
      * individual deletions.
      */
    public Map<String, CompletableFuture<Void>>KafkaFuture<Void>> results();

    /**
      * Return a future which succeeds only if all the topic deletions succeed.
      */
    public CompletableFuture<Void>KafkaFuture<Void> all();
}

class ListTopicsOptions {
    private intInteger clientTimeoutMstimeoutMs = -1null;
    private Booleanboolean listInternal = nulltrue;
    public ListTopicsOptions setClientTimeoutMssetTimeoutMs(intInteger clientTimeoutMstimeoutMs);

    public intInteger clientTimeoutMstimeoutMs();

    /**
      * Set whether we should list internal topics.
      *
      * @param listInternal  Whether we should list internal topics.  null means to use
      *                      the default list internal topics.
      * @return              This ListTopicsOptions object.
      */
    public ListTopicsOptions setListInternal(Booleanboolean listInternal);

    public Booleanboolean listInternal();
}
 
/**
  * The result of the listTopics call.
  */
class ListTopicsResults {
    /**
      * Return a future which yields a map of topic names to TopicDescriptionTopicListing objects.
      */
    public CompletableFuture<Map<StringKafkaFuture<Map<String, TopicDescription>>TopicListing>> namesToDescriptions();

    /**
      * Return a future which yields a collection of TopicDescriptionTopicListing objects.
      */
    public CompletableFuture<Collection<TopicDescription>>KafkaFuture<Collection<TopicListing>> descriptions();

    /**
      * Return a future which yields a collection of topic names.
      */
    public CompletableFuture<Collection<String>> namesKafkaFuture<Collection<String>> names();
}

class TopicListing {
    public String name();
    public boolean internal();
}
 
/**
  * A detailed description of a single topic in the cluster.
  */
class TopicDescription {
    public String name();
    public boolean internal();
    public Map<Integer, TopicPartitionInfo> partitions();
    public Map<Integer, KafkaException> partitionErrors partitions();
    public String toString();
}

class TopicPartitionInfo {
    public int partition();
    public Node leader();
    public List<Node> replicas();
    public List<Node> isr();
    public String toString();
}
 
class DescribeTopicOptionsDescribeTopicsOptions {
    public DescribeTopicOptions setClientTimeoutMssetTimeoutMs(int clientTimeoutMstimeoutMs);
    public int clientTimeoutMstimeoutMs();
}

/**
  * The results of the describeTopic call.
  */
class DescribeTopicResults {
    /**
      * Return a future which yields a map of topic description names to TopicDescription objects.
      */
    public CompletableFuture<TopicDescription> description() {CompletableFuture<Map<String, TopicDescription>> namesToDescriptions();

    /**
      * Return a future which yields a collection of TopicDescription objects.
      */
  return future;
 public   }CompletableFuture<Collection<TopicDescription>> descriptions();
}

/**
  * Options for the listNodes call.
  */
class ListNodesOptionsDescribeClusterOptions {
    public ListNodesOptionsDescribeClusterOptions setClientTimeoutMssetTimeoutMs(intInteger clientTimeoutMstimeoutMs);
    public intInteger clientTimeoutMstimeoutMs();
}

/**
  * The results of the listNodesdescribeCluster call.
  */
class ListNodesResultsDescribeClusterOptionsResults {
    public CompletableFuture<Collection<Node>> nodes();
    public CompletableFuture<Node> controller();
    public CompletableFuture<String> clusterId();
}

/**
  * Options for the apiVersions call.
  */
class ApiVersionsOptions {
    public ApiVersionsOptions setClientTimeoutMstimeoutMs(intInteger clientTimeoutMstimeoutMs);
    public intInteger clientTimeoutMstimeoutMs();
}

/**
  * Results of the apiVersions call.
  */
class ApiVersionsResults {
    ApiVersionsResults(Map<Node, CompletableFuture<NodeApiVersions>>KafkaFuture<NodeApiVersions>> futures);
    public Map<Node, CompletableFuture<NodeApiVersions>>KafkaFuture<NodeApiVersions>> results();
    public CompletableFuture<Map<NodeKafkaFuture<Map<Node, NodeApiVersions>> all();
}


...

We would like to add more functionality to the AdminClient as soon as it becomes available on the brokers.  For example, we would like to add a way of altering topics that already exist.  We would also like to add management APIs for ACLs, or the functionality of GetOffsetShell.