You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

Status

Current state: under discussion
Discussion thread: hereJIRA: KAKFA-3265

Motivation

Systems that interface with Kafka, such as management systems and proxies, often need to perform administrative actions.  For example, they might need to be able to create or delete topics.  Currently, they can't do this without relying on internal Kafka classes, or shell scripts distributed with Kafka.  We would like to add a public, stable AdminClient API that exposes this functionality to JVM-based clients in a well-supported way.

The AdminClient will use the KIP-4 wire protocols to communicate with brokers.  Because we avoid direct communication with ZooKeeper, the client does not need a ZooKeeper dependency.  In fact, once this KIP is implemented, we will be able to lock down Zookeeper security further, by enforcing the invariant that only brokers need to communicate with ZK.

By using the AdminClient API, clients will avoid being tightly coupled to the implementation details of a particular Kafka version.  They will not need to access internal Kafka classes, or parse the output of scripts.  They will also gain the benefits of cross-version client compatibility as implemented in KIP-97.

Proposed Changes

The AdminClient will be distributed as part of kafka-clients.jar.  It will provide a Java API for managing Kafka.

The AdminClient interface will be in the org.apache.kafka.clients.admin namespace.  The implementation will be in the AdminClientImpl class, in the same namespace.  The separation between interface and implementation is intended to make the difference between public API and private implementation clearer, and make developing mocks in unit tests easier.

Users will configure the AdminClient the same way they configure the Producer and Consumer: by supplying a map of keys to values to its constructor.  As much as possible, we should reuse the same configuration key names, such as bootstrap.servers, client.id, etc.  We should also offer the ability to configure timeouts, buffer sizes, and other networking settings.

The AdminClient will provide blocking APIs that closely reflect the requests which the brokers can handle.  The client will be multi-threaded; multiple threads will be able to safely make calls using the same AdminClient object.

In general, we want to avoid using internal Kafka classes in the AdminClient interface.  For example, most RPC classes should be considered internal, such as MetadataRequest or MetadaResponse.  We should be able to change those classes in the future without worrying about breaking users of AdminClient.  Inner classes such as MetadataResponse#TopicMetadata should also be considered internal, and not exposed in the API of AdminClient.

We want to handle errors fully and cleanly in AdminClient.  APIs that require communication with multiple brokers should allow for the possbiility that some brokers will respond and others will not.  Batch APIs that perform multiple independent operations should allow for the possbility that some elements of a batch fail, and others succeed. 

Implementation

As mentioned earlier, the AdminClient will use the KIP-4 wire protocol.  This mainly means using NetworkClient and related RPC classes for the implementation.

This KIP will add only APIs that can be implemented with the existing server-side RPCs.  See "New or Changed Public Interfaces" for details.  The intention is that we will continue to extend AdminClient with further KIPs that also add the appropriate server-side functionality is added (such as ACL management.)

New or Changed Public Interfaces

Clients use the administrative client by creating an instance of class AdminClient.  Just like the producer and consumer, it is configured by supplying a configuration map to the constructor.  When the user is done with the AdminClient, they must call close to release the network sockets and other associated resources of the client.

public interface AdminClient {
    void close();

    /**
     * Creates a new TopicsContext that can be used to perform operations on Kafka topics.
     */
    TopicsContext topics();

    /**
     * Creates a new NodesContext that can be used to get information about nodes.
     */
    NodesContext nodes();

    abstract class TopicsContext {
        protected int clientTimeoutMs;
        protected int serverTimeoutMs;
        protected Map<String, String> creationConfig;

        protected TopicsContext(int clientTimeoutMs, int serverTimeoutMs) {
            this.clientTimeoutMs = clientTimeoutMs;
            this.serverTimeoutMs = serverTimeoutMs;
            this.creationConfig = null;
        }

        /**
         * Set the client timeout for operations.  This is how long the client will block
         * on the operation.
         *
         * @param clientTimeoutMs         The client timeout in ms.
         * @return                      The context object.
         */
        public TopicsContext setClientTimeout(int clientTimeoutMs) {
            if (clientTimeoutMs <= 0)
                throw new InvalidConfigurationException("Cannot set a client timeout or equal to 0.");
            this.clientTimeoutMs = clientTimeoutMs;
            return this;
        }

        /**
         * Set the server timeout for operations.  This is how long the server will block
         * on the operation.
         *
         * @param serverTimeoutMs       The server timeout in ms.  0 for non-blocking.
         * @return                      The context object.
         */
        public TopicsContext setServerTimeout(int serverTimeoutMs) {
            if (serverTimeoutMs < 0)
                throw new InvalidConfigurationException("Cannot set a server timeout less than 0.");
            this.serverTimeoutMs = serverTimeoutMs;
            return this;
        }

        /**
         * Set the configuration that will be used when creating new topics.
         *
         * @param creationConfig        The configuration.
         * @return                      The context object.
         */
        public TopicsContext setCreationConfig(Map<String, String> creationConfig) {
            this.creationConfig = creationConfig;
            return this;
        }

        /**
         * Create a new topic.
         *
         * @param topicName             The name of the topic to create.
         * @param numPartitions         The number of partitions to use in the new topic.
         * @param replicationFactor     The replication factor of the new topic.
         * @param validateOnly          If true, we will validate that the topic can be created,
         *                              but not create it.
         * @return                      A Future which will throw an exception if there is an error.
         */
        abstract Future<Void> create(String topicName, int numPartitions, short replicationFactor,
                                     boolean validateOnly);

        /**
         * Create a new topic.
         *
         * @param topicName             The name of the topic to create.
         * @param replicasAssignments   The replica assignments to use for the new topic.
         * @param validateOnly          If true, we will validate that the topic can be created,
         *                              but not create it.
         * @return                      A Future which will throw an exception if there is an error.
         */
        abstract Future<Void> create(String topicName, Map<Integer, List<Integer>> replicasAssignments,
                                     boolean validateOnly);

        /**
         * Delete a topic.<p/>
         *
         * Note that topic deletion opertations are asynchronous, so even after the future
         * completes, the topic may still be visible for a while.
         *
         * @param topicName             The name of the topic to delete.
         *
         * @return                      A Future which will throw an exception if there is an error.
         */
        abstract Future<Void> delete(String topicName);

        /**
         * List the topics.<p/>
         *
         * @param includeInternal       True if we should include internal topics in the listing.
         *
         * @return                      A Future which contains a list of topic names.
         */
        abstract Future<List<String>> list(boolean includeInternal);

        /**
         * Get detailed information about a specific topic.<p/>
         *
         * @param topicName             The name of the topic to describe.
         *
         * @return                      A Future containing a topic description.
         */
        abstract Future<TopicDescription> describe(String topicName);
    }

    class TopicDescription {
        final String name;
        final boolean internal;
        final Map<Integer, Try<TopicPartitionInfo>> partitions;

        TopicDescription(String name, boolean internal, Map<Integer, Try<TopicPartitionInfo>> partitions) {
            this.name = name;
            this.internal = internal;
            this.partitions = partitions;
        }

        public String name() {
            return name;
        }

        public boolean internal() {
            return internal;
        }

        public Map<Integer, Try<TopicPartitionInfo>> partitions() {
            return partitions;
        }

        @Override
        public String toString() {
            return "(name=" + name + ", internal=" + internal + ", partitions=" +
                Utils.mkString(partitions) + ")";
        }
    }

    class TopicPartitionInfo {
        final int partition;
        final Node leader;
        final List<Node> replicas;
        final List<Node> isr;

        TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) {
            this.partition = partition;
            this.leader = leader;
            this.replicas = replicas;
            this.isr = isr;
        }

        public int partition() {
            return partition;
        }

        public Node leader() {
            return leader;
        }

        public List<Node> replicas() {
            return replicas;
        }

        public List<Node> isr() {
            return isr;
        }

        public String toString() {
            return "(partition=" + partition + ", leader=" + leader + ", replicas=" +
                Utils.mkString(replicas, ", ") + ", isr=" + Utils.mkString(isr, ", ") + ")";
        }
    }

    abstract class NodesContext {
        protected int clientTimeoutMs;

        protected NodesContext(int clientTimeoutMs) {
            this.clientTimeoutMs = clientTimeoutMs;
        }

        /**
         * Set the client timeout for operations.  This is how long the client will block
         * on the operation.
         *
         * @param clientTimeoutMs         The client timeout in ms.
         * @return                      The context object.
         */
        public NodesContext setClientTimeout(int clientTimeoutMs) {
            if (clientTimeoutMs <= 0)
                throw new InvalidConfigurationException("Cannot set a client timeout or equal to 0.");
            this.clientTimeoutMs = clientTimeoutMs;
            return this;
        }

        abstract Future<Set<Node>> list();

        abstract Future<NodeApiVersions> getVersion(int broker);
    }
}

Configuration

Just like the consumer and the producer, the admin client will have its own AdminClientConfig configuration class which extends AbstractConfig.

Initially, the supported configuration keys will be:

  • bootstrap.servers
    • The boostrap servers as a list of host:port pairs.
  • client.id
    • An ID  string to pass to the server when making requests.
  • metadata.max.age.ms
    • The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.
  • send.buffer.bytes
    • The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used
  • receive.buffer.bytes
    • The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used.
  • reconnect.backoff.ms
    • The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.
  • try.backoff.ms
    • The amount of time to wait before attempting to retry a failed request.

  • request.timeout.ms
    • The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.

  • connections.max.idle.ms

    • Close idle connections after the number of milliseconds specified by this config

  • security.protocol

    • The security protocol used to communicate with brokers

Migration Plan and Compatibility

The AdminClient will use KIP-97 API version negotiation to communicate with older or newer brokers.  In cases where an API is not available on an older or newer broker, we will throw an UnsupportedVersionException.

We should avoid making incompatible changes to the AdminClient function and class signatures.  So for example, if we need to add an additional argument to an API, we should add a new function rather than changing an existing function signature.

Test Plan

We should have an AdminClientTest integration test which tests creating topics, deleting topics, and listing topics through the API.  We can use the KafkaServerTestHarness to test this efficiently with multiple brokers.  For methods which support batches, we should test passing a batch of items to be processed.  We should test error conditions as well.  We should test the getAllBrokerVersions and getAllGroups methods as well.

Rejected Alternatives

Futures-Based API

Instead of having a blocking API, we could have an asynchronous API.  In this API, each function would immediately return a Future even before any network I/O had been initiated.  Then, clients could use Future.get() and other methods of the Future class to wait for and handle results.

In general, Future-based APIs are helpful for when you expect to have a very large number of requests in flight at a time, and you don't want to devote a thread to each one.  However, in the AdministrativeClient, we don't expect there to be a very large number of requests in flight at once.  Some AdministrativeClient methods, such as getAllBrokerVersions or getAllGroups, talk to all brokers, so there is no need to have multiple such requests going on at once.  Others, such as AddTopics or DeleteTopics, already support batching at the RPC layer,obviating the need for multiple topic addition or deletion requests to be sent at the same time.

Synchronous APIs are easier to understand and use, so it makes sense to implement them first.  If we decide we need future-based APIs later, we can always add them.

MultipleKafkaException instead of Try<>

Batch APIs can be awkward, because some elements of the batch might succeed even while others fail.  The API proposed here manages this issue by introducing the Try<> class, which can represent either a value or a failure exception, and using constructs such as Maps of nodes to Try<Result>.  Another approach would be to throw an exception in these cases instead of returning a value in the map.  Because multiple exceptions could be thrown for each element in the batch, we would have to introduce a new exception which can contain multiple exceptions-- call it MultipleKafkaException.

Ultimately, we came to the conclusion that MultipleKafkaException would be more awkward to work with than Try.  In the case of APIs such as getAllBrokerVersions, if one broker succeeded and another failed, how does the developer get access to the successful return value when an exception is thrown?  Functions in Java can return a value or throw an exception, but not both.  The return value would have to be inside the exception, which is unintuitive.

Future Work

We would like to add more functionality to the AdminClient as soon as it becomes available on the brokers.  For example, we would like to add a way of altering topics that already exist.  We would also like to add management APIs for ACLs, or the functionality of GetOffsetShell.

  • No labels