Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Motivation
MirrorMaker 2 uses AdminClient
directly to create topics and sync configurations like topic configs and ACLs by enabling sync.topic.configs.enabled
and sync.topic.acls.enabled
, while this approach simplifies the resource management for MM2, it does create the following problems for users who have their systems or tooling to manage Kafka resources:
- External Kafka resource management tool/system may override the same configurations that MM2 synced previously.
- Create/Modify resources using AdminClient directly lead to a lack of auditing and tracing resources.
In use-cases where users have their own centralized system to manage Kafka topics and ACLs, they should easily integrate MirrorMaker2 with their internal ecosystem.
Currently, there are a couple of options to deal with these problems,
- Drop mirrored topics from a centralized system and accept the risk of not tracing these topics. This direction will avoid conflict between topics monitored by MM2 and the centralized system for resource management.
- Creating mirrored topics up-front on destination clusters and disabling features like
sync.topic.configs.enabled
,sync.topic.acls.enabled
.
The current approach also assumes that the user running MM2 has the Admin right to create topics, which is only valid if the user who runs MM2 also manages both source and destination clusters.
This KIP proposes a way for users to run MM2 with custom implementation for the Kafka resource manager.
Public Interfaces
The KIP proposes adding a new interface called KafkaResourceManager that defines how MM2 will create, modify, and list any Kafka topics and ACLs. MirrorMaker2’s original behaviour will be kept in DefaultResourceManager.
The implemented class can be overridden using the following configurations.
resource.manager.class
default value will be set toDefaultResourceManager
- or can be configured based on cluster aliases using
<cluster_alias>.resource.manager.class
Proposed Changes
KafkaResourceManager
interface will define how to list, create, update topics, topic configs, and ACLs.
public interface KafkaResourceManager extends Configurable, AutoCloseable { /** * Method to create kafka resource manager * @param props map of configuration needed to setup resource manager * @return KafkaResourceManager */ KafkaResourceManager create(Map<String, Object> props); /** * Method to close the manager */ void close(); /** * list topics for this cluster * @return Set of topics name as strings * @throws InterruptedException * @throws ExecutionException */ Set<String> listTopics() throws InterruptedException, ExecutionException; /** * describe a collection of topics. * @param topics collection of topic names * @return collection of TopicDescription * @throws InterruptedException * @throws ExecutionException */ Collection<TopicDescription> describeTopics(Collection<String> topics) throws InterruptedException, ExecutionException; /** * describe config for given set of topics. * @param topics set of topic names * @return map of topic name and config object * @throws InterruptedException * @throws ExecutionException */ Map<String, Config> describeTopicConfigs(Set<String> topics) throws InterruptedException, ExecutionException; /** * Create new topics and new partitions. * @param partitionCounts map of new partitions counts for each topic * @param newTopics list of new topics * @param newPartitions map of new partitions for each topic */ void createTopicPartitions(Map<String, Long> partitionCounts, List<NewTopic> newTopics, Map<String, NewPartitions> newPartitions); /** * create compacted topic, mostly used to create internal topics. * @param topicName topic name * @param partitions number of partitions * @param replicationFactor number of replicas */ void createCompactedTopic(String topicName, short partitions, short replicationFactor); /** * update config for given topics. * @param topicConfigs map of topics to topic config * @throws InterruptedException * @throws ExecutionException */ void updateTopicConfigs(Map<String, Config> topicConfigs) throws InterruptedException, ExecutionException; /** * list acl binding for all topics. * @return collection AclBinding * @throws InterruptedException * @throws ExecutionException */ Collection<AclBinding> listTopicAclBindings() throws InterruptedException, ExecutionException; /** * update Acls. * @param bindings list of acl bindings * @throws InterruptedException * @throws ExecutionException */ void updateAcls(List<AclBinding> bindings) throws InterruptedException, ExecutionException; }
DefaultResourceManager
will use AdminClient
to manage the resources. And default value for resource.manager.class
will be DefaultResourceManager
for backward compatibility.
Compatibility, Deprecation, and Migration Plan
- When users upgrade an existing MM2 cluster they don’t need to change any of their current configuration as this proposal maintains the default behaviour for MM2.
Rejected Alternatives
- Manage creating and modifying Kafka topics and ACLs outside MM2 by building a separate tool that monitors the same set of topics as MirrorMaker2 and create/modify topics and ACLs once it detects configuration changes. The problem is this will be a duplicate effort as MM2 already has all this logic implemented; it only needs to use a different client than AdminClient.
- Allow MM2 users to provide a custom implementation of the Admin interface instead of adding a new interface. The problem with this one is that the Admin interface has 86 methods, and MM2 depends on 13 of them; this means any custom implementation of the Admin interface will be humongous. The user will need to return dummy data for 73 methods that MM2 does not use.