You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 18 Next »

Status

Current state: under discussion
Discussion thread: hereJIRA: KAKFA-3265

Motivation

Systems that interface with Kafka, such as management systems and proxies, often need to perform administrative actions.  For example, they might need to be able to create or delete topics.  Currently, they can't do this without relying on internal Kafka classes, or shell scripts distributed with Kafka.  We would like to add a public, stable AdminClient API that exposes this functionality to JVM-based clients in a well-supported way.

The AdminClient will use the KIP-4 wire protocols to communicate with brokers.  Because we avoid direct communication with ZooKeeper, the client does not need a ZooKeeper dependency.  In fact, once this KIP is implemented, we will be able to lock down Zookeeper security further, by enforcing the invariant that only brokers need to communicate with ZK.

By using the AdminClient API, clients will avoid being tightly coupled to the implementation details of a particular Kafka version.  They will not need to access internal Kafka classes, or parse the output of scripts.  They will also gain the benefits of cross-version client compatibility as implemented in KIP-97.

Proposed Changes

The AdminClient will be distributed as part of kafka-clients.jar.  It will provide a Java API for managing Kafka.

The AdminClient interface will be in the org.apache.kafka.clients.admin namespace.  The implementation will be in the AdminClientImpl class, in the same namespace.  The separation between interface and implementation is intended to make the difference between public API and private implementation clearer, and make developing mocks in unit tests easier.

Users will configure the AdminClient the same way they configure the Producer and Consumer: by supplying a map of keys to values to its constructor.  As much as possible, we should reuse the same configuration key names, such as bootstrap.servers, client.id, etc.  We should also offer the ability to configure timeouts, buffer sizes, and other networking settings.

The AdminClient will provide future-based APIs that closely reflect the requests which the brokers can handle.  The client will be multi-threaded; multiple threads will be able to safely make calls using the same AdminClient object.  When a future fails, its get() method will throw an ExceutionException which wraps the underlying exception.

Each area of functionality in the AdminClient is accessed by creating a context object.  For example, functionality related to topics is accessed by calling client.topics() to create a TopicsContext object.  Context objects are lightweight and only contain minimal state such as the lengths of timeouts.  Context objects are not intended to be shared between multiple threads.

In general, we want to avoid using internal Kafka classes in the AdminClient interface.  For example, most RPC classes should be considered internal, such as MetadataRequest or MetadaResponse.  We should be able to change those classes in the future without worrying about breaking users of AdminClient.  Inner classes such as MetadataResponse#TopicMetadata should also be considered internal, and not exposed in the API of AdminClient.

We want to handle errors fully and cleanly in AdminClient.  APIs that require communication with multiple brokers should allow for the possbiility that some brokers will respond and others will not.  Any possible return value from the API should be handled.

Implementation

As mentioned earlier, the AdminClient will use the KIP-4 wire protocol.  This mainly means using NetworkClient and related RPC classes for the implementation.

This KIP will add only APIs that can be implemented with the existing server-side RPCs.  See "New or Changed Public Interfaces" for details.  The intention is that we will continue to extend AdminClient with further KIPs that also add the appropriate server-side functionality is added (such as ACL management.)

New or Changed Public Interfaces

Clients use the administrative client by creating an instance of class AdminClient.  Just like the producer and consumer, it is configured by supplying a configuration map to the constructor.  When the user is done with the AdminClient, they must call close to release the network sockets and other associated resources of the client.

/**
 * The Kafka AdminClient interface.
 */
public interface KafkaAdminClient {
    class Factory {
        /**
         * Create a new KafkaAdminClient with the given configuration.
         *
         * @param conf          The configuration.
         * @return              The new KafkaAdminClient.
         */
        KafkaAdminClient create(Map<String, Object> conf) {
            return new AdminClient(conf);
        }
    }

    /**
     * Close the AdminClient and release all associated resources.
     */
    void close();

    /**
     * The base class for a request to create a new topic.
     */
    abstract class NewTopic {
        private final String name;
        protected Map<String, String> configs = null;

        public NewTopic(String name) {
            this.name = name;
        }

        public String name() {
            return name;
        }

        /**
         * Set the configuration to use on the new topic.
         *
         * @param configs               The configuration map.
         * @return                      This NewTopic object.
         */
        public NewTopic setConfigs(Map<String, String> configs) {
            this.configs = configs;
            return this;
        }

        protected abstract TopicDetails convertToTopicDetails();
    }

    /**
     * A request to create a new topic with a fixed replication factor and number of partitions.
     */
    class NewTopicWithReplication extends NewTopic {
        private final int numPartitions;
        private final short replicationFactor;

        public NewTopicWithReplication(String name, int numPartitions, short replicationFactor) {
            super(name);
            this.numPartitions = numPartitions;
            this.replicationFactor = replicationFactor;
        }

        public int numPartitions() {
            return numPartitions;
        }

        public short replicationFactor() {
            return replicationFactor;
        }

        @Override
        protected TopicDetails convertToTopicDetails() {
            if (configs != null) {
                return new TopicDetails(numPartitions, replicationFactor, configs);
            } else {
                return new TopicDetails(numPartitions, replicationFactor);
            }
        }
    }
 
    /**
     * A request to create a new topic with a specific replica assignment configuration.
     */
    class NewTopicWithReplicaAssignments extends NewTopic {
        private final Map<Integer, List<Integer>> replicasAssignments;
 
        public NewTopicWithReplicaAssignments(String name, Map<Integer, List<Integer>> replicasAssignments) {
            super(name);
            this.replicasAssignments = replicasAssignments;
        }
 
        public Map<Integer, List<Integer>> replicasAssignments() {
            return replicasAssignments;
        }
 
        @Override
        protected TopicDetails convertToTopicDetails() {
            if (configs != null) {
                return new TopicDetails(replicasAssignments, configs);
            } else {
                return new TopicDetails(replicasAssignments);
            }
        }
    }
 
    /**
     * Options for the newTopics call.
     */
    class CreateTopicsOptions {
        private int clientTimeoutMs = -1;
        private int serverTimeoutMs = -1;
        private boolean validateOnly = false;
 
        public CreateTopicsOptions setClientTimeoutMs(int clientTimeoutMs) {
            this.clientTimeoutMs = clientTimeoutMs;
            return this;
        }
 
        public int clientTimeoutMs() {
            return clientTimeoutMs;
        }
 
        public CreateTopicsOptions setServerTimeoutMs(int serverTimeoutMs) {
            this.serverTimeoutMs = serverTimeoutMs;
            return this;
        }
 
        public int serverTimeoutMs() {
            return serverTimeoutMs;
        }
 
        public CreateTopicsOptions setValidateOnly(boolean validateOnly) {
            this.validateOnly = validateOnly;
            return this;
        }
 
        public boolean validateOnly() {
            return validateOnly;
        }
    }
 
    /**
     * The result of the createTopics call.
     */
    class CreateTopicResults {
        private final Map<String, CompletableFuture<Void>> futures;
 
        CreateTopicResults(Map<String, CompletableFuture<Void>> futures) {
            this.futures = futures;
        }
 
        /**
 
 
        /**
         * Return a map from topic names to futures, which can be used to check the status of individual
         * topic creations.
         */
        public Map<String, CompletableFuture<Void>> results() {
            return futures;
        }
 
        /**
         * Return a future which succeeds if all the topic creations succeed.
         */
        public CompletableFuture<Void> all() {
            return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]));
        }
    }
 
    /**
     * Create a batch of new topics.
     *
     * @param newTopics         The new topics to create.
     * @param options           The options to use when creating the new topics, or null to use defaults.
     * @return                  The CreateTopicsResults.
     */
    CreateTopicResults createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options);
 
    /**
     * Create a single new topic.
     *
     * @param newTopic          The new topic to create.
     * @param options           The options to use when creating the new topic, or null to use defaults.
     * @return                  The CreateTopicsResults.
     */
    CreateTopicResults createTopic(NewTopic newTopic, CreateTopicsOptions options);
 
    /**
     * Options for the deleteTopics call.
     */
    class DeleteTopicsOptions {
        private int clientTimeoutMs = -1;
        private int serverTimeoutMs = -1;
 
        public DeleteTopicsOptions setClientTimeoutMs(int clientTimeoutMs) {
            this.clientTimeoutMs = clientTimeoutMs;
            return this;
        }
 
        public int clientTimeoutMs() {
            return clientTimeoutMs;
        }
 
        public DeleteTopicsOptions setServerTimeoutMs(int serverTimeoutMs) {
            this.serverTimeoutMs = serverTimeoutMs;
            return this;
        }
 
        public int serverTimeoutMs() {
            return serverTimeoutMs;
        }
    }
 
    /**
     * The result of the deleteTopics call.
     */
    class DeleteTopicResults {
        final Map<String, CompletableFuture<Void>> futures;
 
        DeleteTopicResults(Map<String, CompletableFuture<Void>> futures) {
            this.futures = futures;
        }
 
        /**
         * Return a map from topic names to futures which can be used to check the status of
         * individual deletions.
         */
        public Map<String, CompletableFuture<Void>> results() {
            return futures;
        }
 
 
 
        /**
         * Return a future which succeeds only if all the topic deletions succeed.
         */
        public CompletableFuture<Void> all() {
            return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]));
        }
    }
 
    /**
     * Delete a batch of topics.
     *
     * @param topics            The topic names to delete.
     * @param options           The options to use when deleting the topics, or null to use defaults.
     * @return                  The DeleteTopicsResults.
     */
    DeleteTopicResults deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
 
    /**
     * Delete a topic.
     *
     * @param topic             The topic name to delete.
     * @param options           The options to use when deleting the topics, or null to use defaults.
     * @return                  The DeleteTopicsResults.
     */
    DeleteTopicResults deleteTopic(String topic, DeleteTopicsOptions options);
 
    class ListTopicsOptions {
        private int clientTimeoutMs = -1;
        private Boolean listInternal = null;
 
        public ListTopicsOptions setClientTimeoutMs(int clientTimeoutMs) {
            this.clientTimeoutMs = clientTimeoutMs;
            return this;
        }
 
        public int clientTimeoutMs() {
            return clientTimeoutMs;
        }
 
        /**
         * Set whether we should list internal topics.
         *
         * @param listInternal  Whether we should list internal topics.  null means to use
         *                      the default.
         * @return              This ListTopicsOptions object.
         */
        public ListTopicsOptions setListInternal(Boolean listInternal) {
            this.listInternal = listInternal;
            return this;
        }
 
        public Boolean listInternal() {
            return listInternal;
        }
    }
 
    /**
     * The result of the listTopics call.
     */
    class ListTopicsResults {
        final CompletableFuture<Map<String, TopicDescription>> future;
 
        ListTopicsResults(CompletableFuture<Map<String, TopicDescription>> future) {
            this.future = future;
        }
 
        /**
         * Return a future which yields a map of topic names to TopicDescription objects.
         */
        public CompletableFuture<Map<String, TopicDescription>> namesToDescriptions() {
            return future;
        }
 
        /**
         * Return a future which yields a collection of TopicDescription objects.
          */
        public CompletableFuture<Collection<TopicDescription>> descriptions() {
            return future.thenApply(new Function<Map<String, TopicDescription>, Collection<TopicDescription>>() {
                @Override
                public Collection<TopicDescription> apply(Map<String, TopicDescription> namesToDescriptions) {
                    return namesToDescriptions.values();
                }
            });
        }

        /**
         * Return a future which yields a collection of topic names.
         */
        public CompletableFuture<Collection<String>> names() {
            return future.thenApply(new Function<Map<String, TopicDescription>, Collection<String>>() {
                @Override
                public Collection<String> apply(Map<String, TopicDescription> namesToDescriptions) {
                    return namesToDescriptions.keySet();
                }
            });
        }
    }

    /**
     * A detailed description of a single topic in the cluster.
     */
    class TopicDescription {
        private final String name;
        private final boolean internal;
        private final Map<Integer, TopicPartitionInfo> partitions;
        private final Map<Integer, KafkaException> partitionErrors;

        TopicDescription(String name, boolean internal,
                        Map<Integer, TopicPartitionInfo> partitions,
                        Map<Integer, KafkaException> partitionErrors) {
            this.name = name;
            this.internal = internal;
            this.partitions = partitions;
            this.partitionErrors = partitionErrors;
        }

        public String name() {
            return name;
        }

        public boolean internal() {
            return internal;
        }

        public Map<Integer, TopicPartitionInfo> partitions() {
            return partitions;
        }

        public Map<Integer, KafkaException> partitionErrors() {
            return partitionErrors;
        }

        @Override
        public String toString() {
            return "(name=" + name + ", internal=" + internal + ", partitions=" +
                Utils.mkString(partitions) + ")";
        }
    }

    class TopicPartitionInfo {
        private final int partition;
        private final Node leader;
        private final List<Node> replicas;
        private final List<Node> isr;

        TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) {
            this.partition = partition;
            this.leader = leader;
            this.replicas = replicas;
            this.isr = isr;
        }
 
        public int partition() {
            return partition;
        }
 
        public Node leader() {
            return leader;
        }
 
        public List<Node> replicas() {
            return replicas;
        }
 
        public List<Node> isr() {
            return isr;
        }
 
        public String toString() {
            return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
                Utils.mkString(replicas, ", ") + ", isr=" + Utils.mkString(isr, ", ") + ")";
        }
    }
 
    /**
     * List the topics available in the cluster.
     *
     * @param options           The options to use when listing the topics, or null to use defaults.
     * @return                  The ListTopicsResults.
     */
    ListTopicsResults listTopics(ListTopicsOptions options);
 
    class DescribeTopicOptions {
        private int clientTimeoutMs = -1;
 
        public DescribeTopicOptions setClientTimeoutMs(int clientTimeoutMs) {
            this.clientTimeoutMs = clientTimeoutMs;
            return this;
        }
 
        public int clientTimeoutMs() {
            return clientTimeoutMs;
        }
    }
 
    /**
     * The results of the describeTopic call.
     */
    class DescribeTopicResults {
        private final CompletableFuture<TopicDescription> future;
 
        DescribeTopicResults(CompletableFuture<TopicDescription> future) {
            this.future = future;
        }
 
        /**
         * Return a future which yields a topic description.
         */
        public CompletableFuture<TopicDescription> description() {
            return future;
        }
    }
 
    /**
     * Descripe an individual topic in the cluster.
     *
     * @param options           The options to use when describing the topic, or null to use defaults.
     * @return                  The DescribeTopicResults.
     */
    DescribeTopicResults describeTopic(String topicName, DescribeTopicOptions options);
 
    /**
     * Options for the listNodes call.
     */
    class ListNodesOptions {
        private int clientTimeoutMs = -1;
 
        public ListNodesOptions setClientTimeoutMs(int clientTimeoutMs) {
 
 
        public ListNodesOptions setClientTimeoutMs(int clientTimeoutMs) {
            this.clientTimeoutMs = clientTimeoutMs;
            return this;
        }
 
        public int clientTimeoutMs() {
            return clientTimeoutMs;
        }
    }
 
    /**
     * The results of the listNodes call.
     */
    class ListNodesResults {
        private final CompletableFuture<Collection<Node>> future;
 
        ListNodesResults(CompletableFuture<Collection<Node>> future) {
            this.future = future;
        }
 
        /**
         * Returns a future which yields a collection of nodes.
         */
        public CompletableFuture<Collection<Node>> nodes() {
            return future;
        }
    }
 
    /**
     * List the nodes in the cluster.
     *
     * @param options           The options to use when listing the nodes, or null to use defaults.
     * @return                  The ListNodesResults.
     */
    ListNodesResults listNodes(ListNodesOptions options);
 
    /**
     * Options for the apiVersions call.
     */
    class ApiVersionsOptions {
        private int clientTimeoutMs = -1;
 
        public ApiVersionsOptions setClientTimeoutMs(int clientTimeoutMs) {
            this.clientTimeoutMs = clientTimeoutMs;
            return this;
        }
 
        public int clientTimeoutMs() {
            return clientTimeoutMs;
        }
    }
 
    /**
     * Results of the apiVersions call.
     */
    class ApiVersionsResults {
        private final Map<Node, CompletableFuture<NodeApiVersions>> futures;
 
        ApiVersionsResults(Map<Node, CompletableFuture<NodeApiVersions>> futures) {
            this.futures = futures;
        }
 
        public Map<Node, CompletableFuture<NodeApiVersions>> results() {
            return futures;
        }
 
        public CompletableFuture<Map<Node, NodeApiVersions>> all() {
            return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])).
                thenApply(new Function<Void, Map<Node, NodeApiVersions>>() {
                    @Override
                    public Map<Node, NodeApiVersions> apply(Void v) {
                        Map<Node, NodeApiVersions> versions = new HashMap<>(futures.size());
                        for (Map.Entry<Node, CompletableFuture<NodeApiVersions>> entry : futures.entrySet()) {
                            try {
                                versions.put(entry.getKey(), entry.getValue().get());
                            } catch (InterruptedException | ExecutionException e) {
                                // This should be unreachable, because allOf ensured that all the futures
 
 
                                // This should be unreachable, because allOf ensured that all the futures
                                // completed successfully.
                                throw new RuntimeException(e);
                            }
                        }
                        return versions;
                    }
                });
        }
    }
 
    /**
     * Get information about the api versions of nodes in the cluster.
     *
     * @param nodes             The nodes to get information about, or null to get information about all nodes.
     * @param options           The options to use when getting api versions of the nodes, or null to use defaults.
     * @return                  The ApiVersionsResults.
     */
    ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options);
 
    /**
     * Get information about the api versions of a node in the cluster.
     *
     * @param nodes             The node to get information about.
     * @param options           The options to use when getting api versions of the node, or null to use defaults.
     * @return                  The ApiVersionsResults.
     */
    ApiVersionsResults apiVersion(Node node, ApiVersionsOptions options);
}

 

Configuration

Just like the consumer and the producer, the admin client will have its own AdminClientConfig configuration class which extends AbstractConfig.

Initially, the supported configuration keys will be:

  • bootstrap.servers
    • The boostrap servers as a list of host:port pairs.
  • client.id
    • An ID  string to pass to the server when making requests.
  • metadata.max.age.ms
    • The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
  • send.buffer.bytes
    • The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used
  • receive.buffer.bytes
    • The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
  • reconnect.backoff.ms
    • The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
  • try.backoff.ms
    • The amount of time to wait before attempting to retry a failed request.

  • request.timeout.ms
    • The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.

  • connections.max.idle.ms

    • Close idle connections after the number of milliseconds specified by this config

  • security.protocol

    • The security protocol used to communicate with brokers

Migration Plan and Compatibility

The AdminClient will use KIP-97 API version negotiation to communicate with older or newer brokers.  In cases where an API is not available on an older or newer broker, we will throw an UnsupportedVersionException.

We should avoid making incompatible changes to the AdminClient function and class signatures.  So for example, if we need to add an additional argument to an API, we should add a new function rather than changing an existing function signature.

Test Plan

We should have an AdminClientTest integration test which tests creating topics, deleting topics, and listing topics through the API.  We can use the KafkaServerTestHarness to test this efficiently with multiple brokers.  For methods which support batches, we should test passing a batch of items to be processed.  We should test error conditions as well.  We should test the node listing and version getting APIs as well.

Rejected Alternatives

Synchronous API

Instead of having a futures-based API, we could have a synchronous API.  In this API, each function would block rather than returning a Future.  However, the Futures-based API can easily be used as a blocking API, simply by calling get() on the Futures which get returned.

Batch APIs

We could have a batch API that allows people to create or delete multiple topics at once.  However, batch APIs are awkward.  We can also do batching behind the scenes by waiting for a millisecond or two before sending requests, to see if more requests arrive.

Future Work

We would like to add more functionality to the AdminClient as soon as it becomes available on the brokers.  For example, we would like to add a way of altering topics that already exist.  We would also like to add management APIs for ACLs, or the functionality of GetOffsetShell.

  • No labels