Status
Current state: under discussion
Discussion thread: add
JIRA: KAKFA-3265
Motivation
Systems that interface with Kafka, such as management systems and proxies, often need to perform administrative actions. For example, they might need to be able to create or delete topics. Currently, they can't do this without relying on internal Kafka classes, or shell scripts distributed with Kafka. We would like to add a public, stable AdministrativeClient API that exposes this functionality to JVM-based clients in a well-supported way.
The AdministrativeClient will use the KIP-4 wire protocols to communicate with brokers. Because we avoid direct communication with ZooKeeper, the client does not need a ZooKeeper dependency. In fact, once this KIP is implemented, we will be able to lock down Zookeeper security further, by enforcing the invariant that only brokers need to communicate with ZK.
By using the AdministrativeClient API, clients will avoid being tightly coupled to the implementation details of a particular Kafka version. They will not need to access internal Kafka classes, or parse the output of scripts. They will also gain the benefits of cross-version client compatibility as implemented in KIP-97.
Proposed Changes
The AdministrativeClient class will be distributed as part of kafka-clients.jar. It will provide a Java API for managing Kafka.
Users will configure the AdministrativeClient the same way they configure the Producer and Consumer: by supplying a map of keys to values to its constructor. As much as possible, we should reuse the same configuration key names, such as bootstrap.servers, client.id, etc. We should also offer the ability to configure timeouts, buffer sizes, and other networking settings.
The AdministrativeClient will provide blocking APIs that closely reflect the requests which the brokers can handle. The client will be multi-threaded; multiple threads will be able to safely make calls using the same AdministrativeClient object.
In general, we want to avoid using internal Kafka classes in the AdministrativeClient interface. For example, most RPC classes should be considered internal, such as MetadataRequest or MetadaResponse. We should be able to change those classes in the future without worrying about breaking users of AdminClient. Inner classes such as MetadataResponse#TopicMetadata should also be considered internal, and not exposed in the API of AdministrativeClient.
We want to handle errors fully and cleanly in AdministrativeClient. APIs that require communication with multiple brokers should allow for the possbiility that some brokers will respond and others will not. Batch APIs that perform multiple independent operations should allow for the possbility that some elements of a batch fail, and others succeed.
Implementation
As mentioned earlier, the AdministrativeClient will use the KIP-4 wire protocol. This mainly means using NetworkClient and related RPC classes for the implementation.
This KIP will add only APIs that can be implemented with the existing server-side RPCs. See "New or Changed Public Interfaces" for details. The intention is that we will continue to extend AdministrativeClient with further KIPs that also add the appropriate server-side functionality is added (such as ACL management.)
New or Changed Public Interfaces
Clients use the administrative client by creating an instance of class AdministrativeClient. Just like the producer and consumer, it is configured by supplying a configuration map to the constructor. When the user is done with the AdministrativeClient, they must call close to release the network sockets and other associated resources of the client.
public enum CreateTopicsFlags {
/**
* Return immediately without waiting for the topic to be created.
*/
NONBLOCKING,
/**
* Perform validation that the proposed topics could be created, but do not actually create them.
*/
VALIDATE_ONLY;
}
/**
* Create a batch of topics.
*
* @param newTopics A list of new topics to create.
* @param flags The flags to use on the requests.
* @return A map from topic names to exceptions. If the entry in the map
* is null, the topic has been successfully created.
* @throws InterruptedException If the thread receives an InterruptedException before finishing.
* @throws TimeoutException If the thread times out before learning which node is the controller.
* Individual topic creations may also time out, causing exceptions to
* be stored in the map.
*/
public Map<String, KafkaException> createTopics(final List<NewTopic> newTopics,
EnumSet<CreateTopicsFlags> flags) throws InterruptedException
/**
* Create a single topic.
*
* @param newTopic The new topic to create.
* @param flags The flags to use on the requests.
* @throws InterruptedException If the thread receives an InterruptedException before finishing.
* @throws TimeoutException If the thread times out before receiving a result.
* @throws KafkaException If there was an error creating the new topic.
*/
public void createTopic(NewTopic newTopic, EnumSet<CreateTopicsFlags> flags) throws InterruptedException
public enum DeleteTopicsFlags {
/**
* Return immediately without waiting for the topic to be deleted.
*/
NONBLOCKING,
}
/**
* Delete a batch of topics.
*
* @param topics A set of topics to delete.
* @param flags The flags to use on the requests.
* @return A map from topic names to exceptions. If the entry in the map is null,
* the topic has been successfully deleted.
* @throws InterruptedException If the thread receives an InterruptedException before finishing.
* @throws TimeoutException If the thread times out before learning which node is the controller.
* Individual topic deletions may also time out, causing exceptions to be
* stored in the map.
*/
public Map<String, KafkaException> deleteTopics(final Set<String> topics, EnumSet<DeleteTopicsFlags> flags)
throws InterruptedException
/**
* Delete a single topic.
*
* @param topicName The topic name to delete.
* @param flags The flags to use on the request.
* @throws InterruptedException If the thread receives an InterruptedException before finishing.
* @throws TimeoutException If the thread times out before receiving a result.
* @throws KafkaException If there was an error deleting the topic.
*/
public void deleteTopic(String topicName, EnumSet<DeleteTopicsFlags> flags) throws InterruptedException
/**
* A disjoint union of a result type and a failure exception.
* Used to represent something that could be either an error or a return value.
* Inspired by scala.util.Try and Rust's std::result.
*
* @param <T> The result type.
*/
public final class Try<T> {
public static <T> Try newError(RuntimeException exception);
public static <T> Try newValue(T value);
public T get();
public boolean isFailure();
public boolean isSuccess();
...
}
public enum ListTopicsFlags {
/**
* List internal topics.
*/
INCLUDE_INTERNAL,
}
/**
* List topics in the cluster.
*
* @param flags The flags to use on the request.
* @return A map from topic names to TopicInfo objects or errors.
* An error is returned when we are not authorized to describe the given topic.
* @throws InterruptedException If the thread receives an InterruptedException before finishing.
* @throws TimeoutException If the thread times out before receiving cluster metadata.
*/
public Map<String, Try<TopicInfo>> listTopics(EnumSet<ListTopicsFlags> flags) throws InterruptedException
/**
* Get version information about all brokers.
*
* @return A map from broker nodes to ApiVersion information.
*
* @throws InterruptedException If the thread receives an InterruptedException before finishing.
* @throws TimeoutException If the thread times out before receiving cluster metadata.
*/
public Map<Node, Try<NodeApiVersions>> getAllBrokerVersions() throws InterruptedException
/**
* Get information about all groups.
*
* @throws InterruptedException If the thread receives an InterruptedException before finishing.
* @throws TimeoutException If the thread times out before receiving a result.
* @throws KafkaException If there was an error deleting the topic.
*/
public Map<Node, Try<List<GroupOverview>>> getAllGroups() throws InterruptedException
Migration Plan and Compatibility
The AdministrativeClient will use KIP-97 API version negotiation to communicate with older or newer brokers. In cases where an API is not available on an older or newer broker, we will throw an UnsupportedVersionException.
We should avoid making incompatible changes to the AdministrativeClient function and class signatures. So for example, if we need to add an additional argument to an API, we should add a new function rather than changing an existing function signature.
Test Plan
We should have an AdministrativeClientTest integration test which tests creating topics, deleting topics, and listing topics through the API. We can use the KafkaServerTestHarness to test this efficiently with multiple brokers. For methods which support batches, we should test passing a batch of items to be processed. We should test error conditions as well. We should test the getAllBrokerVersions and getAllGroups methods as well.
Rejected Alternatives
Futures-Based API
Instead of having a blocking API, we could have an asynchronous API. In this API, each function would immediately return a Future even before any network I/O had been initiated. Then, clients could use Future.get() and other methods of the Future class to wait for and handle results.
In general, Future-based APIs are helpful for when you expect to have a very large number of requests in flight at a time, and you don't want to devote a thread to each one. However, in the AdministrativeClient, we don't expect there to be a very large number of requests in flight at once. Some AdministrativeClient methods, such as getAllBrokerVersions or getAllGroups, talk to all brokers, so there is no need to have multiple such requests going on at once. Others, such as AddTopics or DeleteTopics, already support batching at the RPC layer,obviating the need for multiple topic addition or deletion requests to be sent at the same time.
Synchronous APIs are easier to understand and use, so it makes sense to implement them first. If we decide we need future-based APIs later, we can always add them.
MultipleKafkaException instead of Try<>
Batch APIs can be awkward, because some elements of the batch might succeed even while others fail. The API proposed here manages this issue by introducing the Try<> class, which can represent either a value or a failure exception, and using constructors such as Maps of nodes to Try<Result>. Another approach would be to throw an exception in these cases instead of returning a value in the map. Because multiple exceptions could be thrown for each element in the batch, we would have to introduce a new exception which can contain multiple exceptions-- call it MultipleKafkaException.
Ultimately, we came to the conclusion that MultipleKafkaException would be more awkward to work with than Try. In the case of APIs such as getAllBrokerVersions, if one broker succeeded and another failed, how does the developer get access to the successful return value when an exception is thrown? Functions in Java can return a value or throw an exception, but not both. The return value would have to be inside the exception, which is unintuitive.
Future Work
We would like to add more functionality to the AdministrativeClient as soon as it becomes available on the brokers. For example, we would like to add a way of altering topics that already exist. We would also like to add management APIs for ACLs.