You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Motivation

MirrorMaker 2 uses AdminClient  directly to create topics, create new topic partitions and sync configurations like topic configs and ACLs by enabling sync.topic.configs.enabled  and sync.topic.acls.enabled. The current design runs with 2 main assumptions:

  1. The user running MM2 has the following ACLs
    • `Create` ACLs for topics on the source clusters to create heartbeat topics.
    • `Create`  and `Alter/AlterConfigs` ACLs to create topics, create partitions, update topics' config and topics' ACLs on the destination clusters.
  2. MM2 can bypass any existing Kafka resource management solutions that organizations runs and create/update resources outside this system.

These assumption wouldn't work for any organization that runs any sort of resource management or federated solutions where these systems are usually the only application allowed to initializing a client with `Create` and`Alter/AlterConfigs` ACLs. And no other teams/groups/applications are allowed to have this same level of ACLs to create such a client outside these systems.


While this approach simplifies the resource management for MM2, it creates a lot of pain for most organizations that try to integrate MM2 as part of their Kafka ecosystem. Currently there 3 ways to deal with this issue:

  1. Drop mirrored topics from a centralized/federated system and accept the risk of not tracing these topics. This direction will avoid conflict between topics monitored by MM2 and the centralized system for resource management. However it will create capacity and budget issues. (more details below in The downsides of MM2 use AdminClient directly in the motivation)
  2. Creating mirrored topics up-front on destination clusters and disabling features like sync.topic.configs.enabled, sync.topic.acls.enabled. This approach need a lot of work especially if MM2 is running to mirror large volume of topics. For example mirroring with config topics=.*  
  3. Run watcher that sync Kafka cluster resource to Kafka 
    1. Any downtime for this watcher will impact the sync between MM2 and management tools.
    2. Data Platform teams still know about the capacity after MM2 start to mirror data. So any capacity safeguard in these system wouldn't apply. 

An example for such a watcher is Strimzi K8S Topic Operator. Which recosiliate Topic and Topic's config between ZK and Topic Resources on K8S however it doesn't have the same solution for UserOperator which make consumer group can't migrate without these ACLs created first. (more details below in The downsides of MM2 use AdminClient directly in the motivation)


The downsides of MM2 use AdminClient directly

As mentioned before the usage of AdminClient directly within MM2 simplify the resource management for MM2. However, it does create the following problems for any users who use IaC (Infra-as-Code), federated solutions, or have a capacity/budget planning system for Kafka destination clusters. Here is a list of potential undesired impact of MM2 bypass the organization ecosystem:

  • Capacity/Budgeting Planning:
    1. MM2 automatically creates topics (breaking the rule of `auto.create.topics.enable=false`) and creates topic partitions on destination clusters if the number of partitions increases on the source for any topic match regex/list of topic  config. 
    2. Sync all topic configs include configurations that impact capacity like `retention.ms` and `retention.bytes`.

These two functionality in MM2 leads to increase disk usage on destination cluster. The team that runs the cluster will only notice the capacity issue when their disk usage hits the threshold for their alerts. 

  •  Provisioning conflict:

MM2 used `AdminClient` directly to perform the following functionality

    • Create a Kafka topic (no way to disable this)
    • Create new Kafka partitions (no way to disable this)
    • Sync Kafka Topic configurations (can be disabled, but then this reduces the value of MM2 potential for users)
    • Sync Kafka topic's ACLs (can be disabled, but this reduces the users' value). Disabling this feature also means that users must ensure they have the right ACLs to the mirrored topics on the destination cluster before switching their consumers, especially when MM2 is used for disaster recovery. It may lead to extra downtime for them.

The usage of AdminClient  directly causes an issue with teams that 

    • Manage their Kafka resources using tools like Strimizi or custom federated solutions. For example, Strimizi's user operator doesn't sync the topic ACLs when MM2 is syncing topic ACLs. Strimzi documentation mentions that users must to disable MM2 sync.topic.acls.enabled  if they use UserOperator . 
    • Teams that run MM2 but don't own the destination cluster and not allowed to have the Create/Alter/AlterConfigs. In this case, these teams don't have Admin access, but they may have access to Kafka management solutions, such as yahoo/CMAK REST API or an in-house solution. For such a tool as CMAK REST API, these teams can update/create resources using CMAK REST API.


This KIP proposes a way for users to run MM2 with custom implementation for the Kafka resource manager in order to easily integrate MM2 with their ecosystem. 

Public Interfaces

The KIP proposes adding a new interface called KafkaResourceManager that defines how MM2 will create, modify, and list any Kafka topics and ACLs. MirrorMaker2’s original behaviour will be kept in DefaultResourceManager.

The implemented class can be overridden using the following configurations.

  • resource.manager.class default value will be set to DefaultResourceManager
  • or can be configured based on cluster aliases using <cluster_alias>.resource.manager.class

Proposed Changes

KafkaResourceManager interface will define how to list, create, update topics, topic configs, and ACLs.

public interface KafkaResourceManager extends Configurable, AutoCloseable {
    /**     
    * Method to create kafka resource manager
    * @param props map of configuration needed to setup resource manager
    * @return KafkaResourceManager     
    */    
    KafkaResourceManager create(Map<String, Object> props);
    
    /** 
	* Method to close the manager
	*/
    void close();
    
     /**
     * list topics for this cluster
     * @return Set of topics name as strings
     * @throws InterruptedException
     * @throws ExecutionException
     */
    Set<String> listTopics() throws InterruptedException, ExecutionException;
    
    /** 
    * describe a collection of topics. 
    * @param topics collection of topic names 
    * @return collection of TopicDescription 
    * @throws InterruptedException 
    * @throws ExecutionException 
    */
    Collection<TopicDescription> describeTopics(Collection<String> topics) throws InterruptedException, ExecutionException;
    
    /** 
    * describe config for given set of topics.
    * @param topics set of topic names 
    * @return map of topic name and config object 
    * @throws InterruptedException 
    * @throws ExecutionException 
    */
    Map<String, Config> describeTopicConfigs(Set<String> topics) throws InterruptedException, ExecutionException;
    
    /**     
    * Create new topics and new partitions.  
    * @param newTopics list of new topics     
    * @param newPartitions map of new partitions for each topic     
    */    
    void createTopicPartitions(List<NewTopic> newTopics, Map<String, NewPartitions> newPartitions);    
    
    /**     
    * create compacted topic, mostly used to create internal topics.
    * @param topicName topic name     
    * @param partitions number of partitions     
    * @param replicationFactor number of replicas     
    */    
    void createCompactedTopic(String topicName, short partitions, short replicationFactor);    
    
    /**     
    * update config for given topics.    
    * @param topicConfigs map of topics to topic config     
    * @throws InterruptedException     
    * @throws ExecutionException     
    */    
    void updateTopicConfigs(Map<String, Config> topicConfigs) throws InterruptedException, ExecutionException;    
    
    /**
     * list acl binding for all topics.
     * @return collection AclBinding
     * @throws InterruptedException
     * @throws ExecutionException
     */
    Collection<AclBinding> listTopicAclBindings() throws InterruptedException, ExecutionException;
    
    /**     
    * update Acls.    
    * @param bindings list of acl bindings     
    * @throws InterruptedException     
    * @throws ExecutionException     
    */    
    void updateAcls(List<AclBinding> bindings) throws InterruptedException, ExecutionException;
}


DefaultResourceManager will use AdminClient to manage the resources. And default value for resource.manager.class will be DefaultResourceManager for backward compatibility.

Compatibility, Deprecation, and Migration Plan

  • When users upgrade an existing MM2 cluster they don’t need to change any of their current configuration as this proposal maintains the default behaviour for MM2.

Rejected Alternatives

  1. Manage creating and modifying Kafka topics and ACLs outside MM2 by building a separate tool that monitors the same set of topics as MirrorMaker2 and create/modify topics and ACLs once it detects configuration changes. The downsides with this are
    1. This will be a duplicate effort as MM2 already has all this logic implemented; it only needs to use a different client than AdminClient.
    2. MM2 still bypass any capacity safeguards the ecosystem would have in place. 
    3. Any downtime with this tool will cause conflict between MM2 mirrored resources and management resource system.
  2. Allow MM2 users to provide a custom implementation of the Admin interface instead of adding a new interface. The downsides with this are
    1.  The Admin interface has 86 methods, and MM2 depends on 13 of them; this means any custom implementation of the Admin interface will be humongous. The user will need to return "NotImplementedException" for 73 methods that MM2 does not use. 
    2. MM2 interactions with AdminClient is not listed in one place, so it's bit hard to list what interfaces you need to implement to MM2. 
    3. Keeping up with the AdminClient interface which is enormous. 
  • No labels