Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Motivation
MirrorMaker 2 uses AdminClient
directly to create topics, create new topic partitions and sync configurations like topic configs and ACLs by enabling sync.topic.configs.enabled
and sync.topic.acls.enabled
. The current approach also assumes that the user running MM2 has the Admin right to create/update topics, which is only valid if the user who runs MM2 also manages both source and destination clusters.
While this approach simplifies the resource management for MM2, it does create the following problems for any users who use IaC (Infra-as-Code), federated solutions, or have a capacity/budget planning system for Kafka destination clusters.
The impact of MM2 use AdminClient directly out of such ecosystem lead to two main issues
- Capacity/Budgeting Planning:
- MM2 automatically creates topics (breaking the rule of `auto.create.topics.enable=false`) and creates topic partitions on destination clusters if the number of partitions increases on the source for any topic match regex/list of
topic
config. - Sync all topic configs include configurations that impact capacity like `retention.ms` and `retention.bytes`.
- MM2 automatically creates topics (breaking the rule of `auto.create.topics.enable=false`) and creates topic partitions on destination clusters if the number of partitions increases on the source for any topic match regex/list of
These two functionality in MM2 leads to increase desk usage on destination cluster. The team that runs the cluster will only notice the capacity issue when their disk usage hits the threshold for their alerts.
- Provisioning conflict:
MM2 used `AdminClient` directly to perform the following functionality
- Create a Kafka topic (no way to disable this)
- Create new Kafka partitions (no way to disable this)
- Sync Kafka Topic configurations (can be disabled, but then this reduces the value of MM2 potential for users)
- Sync Kafka topic's ACLs (can be disabled, but this reduces the users' value). Disabling this feature also means that users must ensure they have the right ACLs to the mirrored topics on the destination cluster before switching their consumers, especially when MM2 is used for disaster recovery. It may lead to extra downtime for them.
The usage of AdminClient
directly causes an issue with teams that
- Manage their Kafka resources using tools like Strimizi or custom federated solutions. For example, Strimizi's user operator doesn't sync the topic ACLs when MM2 is syncing topic ACLs. Strimzi documentation mentions that users must to disable MM2
sync.topic.acls.enabled
if they useUserOperator
. - Teams that run MM2 but don't own the destination cluster. In this case, these teams don't have Admin access, but they may have Kafka management solutions, such as yahoo/CMAK or an in-house solution. For such a tool as CMAK, these teams can update/create resources using CMAK REST API.
- Manage their Kafka resources using tools like Strimizi or custom federated solutions. For example, Strimizi's user operator doesn't sync the topic ACLs when MM2 is syncing topic ACLs. Strimzi documentation mentions that users must to disable MM2
In use-cases where users have their own centralized system to manage Kafka topics and ACLs, they should easily integrate MirrorMaker2 with their internal ecosystem.
Currently, there are a few options to deal with these problems,
- Drop mirrored topics from a centralized system and accept the risk of not tracing these topics. This direction will avoid conflict between topics monitored by MM2 and the centralized system for resource management.
- Creating mirrored topics up-front on destination clusters and disabling features like
sync.topic.configs.enabled
,sync.topic.acls.enabled
. - Run watcher that sync Kafka cluster resource to the in-house ecosystem.
This KIP proposes a way for users to run MM2 with custom implementation for the Kafka resource manager.
Public Interfaces
The KIP proposes adding a new interface called KafkaResourceManager that defines how MM2 will create, modify, and list any Kafka topics and ACLs. MirrorMaker2’s original behaviour will be kept in DefaultResourceManager.
The implemented class can be overridden using the following configurations.
resource.manager.class
default value will be set toDefaultResourceManager
- or can be configured based on cluster aliases using
<cluster_alias>.resource.manager.class
Proposed Changes
KafkaResourceManager
interface will define how to list, create, update topics, topic configs, and ACLs.
public interface KafkaResourceManager extends Configurable, AutoCloseable { /** * Method to create kafka resource manager * @param props map of configuration needed to setup resource manager * @return KafkaResourceManager */ KafkaResourceManager create(Map<String, Object> props); /** * Method to close the manager */ void close(); /** * list topics for this cluster * @return Set of topics name as strings * @throws InterruptedException * @throws ExecutionException */ Set<String> listTopics() throws InterruptedException, ExecutionException; /** * describe a collection of topics. * @param topics collection of topic names * @return collection of TopicDescription * @throws InterruptedException * @throws ExecutionException */ Collection<TopicDescription> describeTopics(Collection<String> topics) throws InterruptedException, ExecutionException; /** * describe config for given set of topics. * @param topics set of topic names * @return map of topic name and config object * @throws InterruptedException * @throws ExecutionException */ Map<String, Config> describeTopicConfigs(Set<String> topics) throws InterruptedException, ExecutionException; /** * Create new topics and new partitions. * @param newTopics list of new topics * @param newPartitions map of new partitions for each topic */ void createTopicPartitions(List<NewTopic> newTopics, Map<String, NewPartitions> newPartitions); /** * create compacted topic, mostly used to create internal topics. * @param topicName topic name * @param partitions number of partitions * @param replicationFactor number of replicas */ void createCompactedTopic(String topicName, short partitions, short replicationFactor); /** * update config for given topics. * @param topicConfigs map of topics to topic config * @throws InterruptedException * @throws ExecutionException */ void updateTopicConfigs(Map<String, Config> topicConfigs) throws InterruptedException, ExecutionException; /** * list acl binding for all topics. * @return collection AclBinding * @throws InterruptedException * @throws ExecutionException */ Collection<AclBinding> listTopicAclBindings() throws InterruptedException, ExecutionException; /** * update Acls. * @param bindings list of acl bindings * @throws InterruptedException * @throws ExecutionException */ void updateAcls(List<AclBinding> bindings) throws InterruptedException, ExecutionException; }
DefaultResourceManager
will use AdminClient
to manage the resources. And default value for resource.manager.class
will be DefaultResourceManager
for backward compatibility.
Compatibility, Deprecation, and Migration Plan
- When users upgrade an existing MM2 cluster they don’t need to change any of their current configuration as this proposal maintains the default behaviour for MM2.
Rejected Alternatives
- Manage creating and modifying Kafka topics and ACLs outside MM2 by building a separate tool that monitors the same set of topics as MirrorMaker2 and create/modify topics and ACLs once it detects configuration changes. The problem is this will be a duplicate effort as MM2 already has all this logic implemented; it only needs to use a different client than AdminClient.
- Allow MM2 users to provide a custom implementation of the Admin interface instead of adding a new interface. The problem with this one is that the Admin interface has 86 methods, and MM2 depends on 13 of them; this means any custom implementation of the Admin interface will be humongous. The user will need to return dummy data for 73 methods that MM2 does not use.