Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
MirrorMaker 2 uses AdminClient directly to create topics and sync configurations like topic configs and ACLs by enabling sync.topic.configs.enabled
and sync.topic.acls.enabled
, while this approach simplifies the resource management for MM2, it does create the following problems for users who have their systems or tooling to manage Kafka resources:
- External Kafka resource management tool/system may override the same configurations that MM2 synced previously.
- Create/Modify resources using AdminClient directly lead to a lack of auditing and tracing resources.
In use-cases where users have their own centralized system to manage Kafka topics and ACLs, they should easily integrate MirrorMaker2 with their internal ecosystem.
Currently, there are a couple of options to deal with these problems,
- Drop mirrored topics from centralized system and accept the risk of not tracing these topics. This direction will avoid conflict between topics monitored by MM2 and the centralized system for resource managment.
- Creating topics up-front and disabling features like
sync.topic.configs.enabled
,sync.topic.acls.enabled
.
The current approach also assume that the user who is running MM2 has Admin right to create topics, which only true if the user who run MM2 is also manage both source and destination clusters.
This KIP is proposing a way for users to run MM2 with custom implementation for Kafka resource manager.
Public Interfaces
The KIP is proposing adding new interface called KafkaResourceManager
that define how MM2 will create, modify, list any Kafka topics and ACLs. MirrorMaker2’s original behaviour will be kept in DefaultResourceManager
The implemented class can be overridden using configurations
resource.manager.class
default value will be set toDefaultResourceManager
- or can be configured based on cluster aliases using
<cluster_alias>.resource.manager.class
Proposed Changes
KafkaResourceManager
interface will define how to list, create, update topic, topic configs, and ACLs.
public interface KafkaResourceManager extends Configurable, AutoCloseable { /** * Method to create kafka resource manager * @param props map of configuration needed to setup resource manager * @return KafkaResourceManager */ KafkaResourceManager create(Map<String, Object> props); /** * list topics for this cluster * @return Set of topics name as strings * @throws InterruptedException * @throws ExecutionException */ Set<String> listTopics() throws InterruptedException, ExecutionException; /** * describe a collection of topics. * @param topics collection of topic names * @return collection of TopicDescription * @throws InterruptedException * @throws ExecutionException */ Collection<TopicDescription> describeTopics(Collection<String> topics) throws InterruptedException, ExecutionException; /** * describe config for given set of topics. * @param topics set of topic names * @return map of topic name and config object * @throws InterruptedException * @throws ExecutionException */ Map<String, Config> describeTopicConfigs(Set<String> topics) throws InterruptedException, ExecutionException; /** * Create new topics and new partitions. * @param partitionCounts map of new partitions counts for each topic * @param newTopics list of new topics * @param newPartitions map of new partitions for each topic */ void createTopicPartitions(Map<String, Long> partitionCounts, List<NewTopic> newTopics, Map<String, NewPartitions> newPartitions); /** * create compacted topic, mostly used to create internal topics. * @param topicName topic name * @param partitions number of partitions * @param replicationFactor number of replicas */ void createCompactedTopic(String topicName, short partitions, short replicationFactor); /** * update config for given topics. * @param topicConfigs map of topics to topic config * @throws InterruptedException * @throws ExecutionException */ void updateTopicConfigs(Map<String, Config> topicConfigs) throws InterruptedException, ExecutionException; /** * list acl binding for all topics. * @return collection AclBinding * @throws InterruptedException * @throws ExecutionException */ Collection<AclBinding> listTopicAclBindings() throws InterruptedException, ExecutionException; /** * update Acls. * @param bindings list of acl bindings * @throws InterruptedException * @throws ExecutionException */ void updateAcls(List<AclBinding> bindings) throws InterruptedException, ExecutionException; }
DefaultResourceManager
will use AdminClient
to manage the resources. And default value for resource.manager.class
will be DefaultResourceManager
for backward compatibility.
Compatibility, Deprecation, and Migration Plan
- When users upgrade an existing MM2 cluster they don’t need to change any of their current configuration as this proposal maintains the default behaviour for MM2.
Rejected Alternatives
- Manage creating and modifying kafka topics and ACLs outside MM2 by building separate tool that monitor the same set of topics as MirrorMaker2 and create/modify topics and ACLs once it detects configuration changes. The problem is this will be a duplicate effort as MM2 already has all this logic implemented; it only needs to use a different client than
AdminClient