Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Motivation

MirrorMaker 2 uses AdminClient  directly to create topics, create new topic partitions and sync configurations like topic configs and ACLs by enabling sync.topic.configs.enabled  and sync.topic.acls.enabled. The current design runs with 2 main assumptions:

...

An example for such a watcher is Strimzi K8S Topic Operator. Which recosiliate Topic and Topic's config between ZK and Topic Resources on K8S however it doesn't have the same solution for UserOperator which make consumer group can't migrate without these ACLs created first. (more details below in The downsides of MM2 use AdminClient directly in the motivation)


The downsides of MM2 use AdminClient directly

As mentioned before the usage of AdminClient directly within MM2 simplify the resource management for MM2. However, it does create the following problems for any users who use IaC (Infra-as-Code), federated solutions, or have a capacity/budget planning system for Kafka destination clusters. Here is a list of potential undesired impact of MM2 bypass the organization ecosystem:

...

This KIP proposes a way for users to run MM2 with custom implementation for the Kafka resource manager in order to easily integrate MM2 with their ecosystem. 

Public Interfaces

The KIP proposes adding flexibility to how MM2 manage Kafka resources.  

There're 2 potential solutions under discussion now: 

...

Adding a new interface called KafkaResourceManager ("the original proposal") that defines how MM2 will create, modify, and list any Kafka topics and ACLs. MirrorMaker2’s original behaviour will be kept in DefaultResourceManager.

The implemented class can be overridden using the following configurations.

  • resource.manager.class default value will be set to DefaultResourceManager
  • or can be configured based on cluster aliases using <cluster_alias>.resource.manager.class

...

By allowing MM2 to load different implementation of Admin interface.

To make it easier for users to provide their custom implementation of Admin, the KIP will introduce ForwardingAdmin class delegate to KafkaAdminClient. 

...

The implemented class can be overridden using the following configurations.

  • forwarding.admin.class default value will be set to
  • AdminClient
  • ForwardingAdmin with KafkaAdminClient as delegate.
  • or can be configured based on cluster aliases using <cluster_alias>.forwarding.admin.class

The downsides of this proposal are

      1.  The Admin interface has 86 methods, and MM2 depends on 13 of them; this means any custom implementation of the Admin interface will be humongous. The user will need to return "NotImplementedException" for 73 methods that MM2 does not use. 
      2. MM2 interactions with AdminClient is not listed in one place, so it's bit hard to list what interfaces you need to implement to MM2. 
      3. Keeping up with the AdminClient interface which is enormous. 

Proposed Changes

There're 2 potential solutions under discussion now: 

1. Add new interface for MM2 to manage resources KafkaResourceManager 

KafkaResourceManager interface will define how to list, create, update topics, topic configs, and ACLs.

Proposed Changes

  • ForwardingAdmin class 


Code Block
languagejava
titleForwardingAdmin.java
public class ForwardingAdminClient implements Admin 
Code Block
public interface KafkaResourceManager extends Configurable, AutoCloseable {
    /**private final Admin   
    * Method to create kafka resource managerdelegate;

    * @param props map of configuration needed to setup resource manager
    * @return KafkaResourceManager     
    */    
    KafkaResourceManager create(Map<String, Object> props);
    
    /** 
	* Method to close the manager
	*/
    void close();
    
     /**
     * list topics for this cluster
     * @return Set of topics name as strings
     * @throws InterruptedException
     * @throws ExecutionException
     */
    Set<String> listTopics() throws InterruptedException, ExecutionExceptionpublic ForwardingAdminClient(final Admin delegate) {
        this.del
egate = delegate;
    
    /** }

    * describe a collection of topics. @Override
    *public @param topics collection of topic names 
    * @return collection of TopicDescription 
    * @throws InterruptedException 
    * @throws ExecutionException 
    */
    Collection<TopicDescription> describeTopics(Collection<String> topics) throws InterruptedException, ExecutionException;void close(Duration timeout) {
    
    /** 
    * describe config for given set of topics.
    * @param topics set of topic names 
    * @return map of topic name and config object 
    * @throws InterruptedException 
    * @throws ExecutionException 
    */
    Map<String, Config> describeTopicConfigs(Set<String> topics) throws InterruptedException, ExecutionExceptiondelegate.close();
    
    /**     }

    * Create new topics and new partitions.  @Override
    *public @param newTopics list of new topics     
    * @param newPartitions map of new partitions for each topic     
    */    
    void createTopicPartitions(List<NewTopic>CreateTopicsResult createTopics(Collection<NewTopic> newTopics, Map<String, NewPartitions> newPartitions);    
    
    /**CreateTopicsOptions options) {
       
    * create compacted topic, mostly used to create internal topics.
    * @param topicName topic name     
    * @param partitions number of partitions     
    * @param replicationFactor number of replicas     
    */    
    void createCompactedTopic(String topicName, short partitions, short replicationFactor);    
    
    /**     
    * update config for given topics.    
    * @param topicConfigs map of topics to topic config     
    * @throws InterruptedException     
    * @throws ExecutionException     return delegate.createTopics(newTopics, options);
    }
    
    */    
    void updateTopicConfigs(Map<String, Config> topicConfigs) throws InterruptedException, ExecutionException;    
    
    /**
 override rest of Admin *interface listto acl binding for all topics.
     * @return collection AclBinding
     * @throws InterruptedException
     * @throws ExecutionException
     */
    Collection<AclBinding> listTopicAclBindings() throws InterruptedException, ExecutionException;
    
    /**     
    * update Acls.    
    * @param bindings list of acl bindings     
    * @throws InterruptedException     
    * @throws ExecutionException     
    */    
    void updateAcls(List<AclBinding> bindings) throws InterruptedException, ExecutionException;
}

DefaultResourceManager will use AdminClient to manage the resources. And default value for resource.manager.class will be DefaultResourceManager for backward compatibility.

2. Add configuration to MM2 to load different implementation of Admin interface

...

use delegate...
}

 

  • Add configuration forwarding.admin.class  to MirrorConnectorConfig
  • Update All MM2 connectors to use the forwarding.admin.class
    • Add MirrorConnectorConfig.forwardingAdminClient that will load forwarding.admin.class and return ForwardingAdminClient. 
    • All Connectors and Tasks will replace AdminClient.create by MirrorConnectorConfig.forwardingAdminClient 
      • MirrorCheckpointTask
      • MirrorCheckpointConnector
      • MirrorSourceConnector
    • Use ForwardingAdmin in MirrorUtils instead of TopicAdmin to create internal compacted topics

Compatibility, Deprecation, and Migration Plan

  • When users upgrade an existing MM2 cluster they don’t need to change any of their current configuration as this proposal maintains the default behaviour for MM2.

Rejected Alternatives

  • Adding a new interface called KafkaResourceManager ("was the original proposal") that defines how MM2 will create, modify, and list any Kafka topics and ACLs. MirrorMaker2’s original behaviour will be kept in DefaultResourceManager.

    And override the implementation using the following configurations.

    • resource.manager.class default value will be set to DefaultResourceManager
    • or can be configured based on cluster aliases using <cluster_alias>.resource.manager.class
    The downside with this is it may create confusing for users between Admin and ResourceManager 
  • Manage creating and modifying Kafka topics and ACLs outside MM2 by building a separate tool that monitors the same set of topics as MirrorMaker2 and create/modify topics and ACLs once it detects configuration changes. The downsides with this are
    1. This will be a duplicate effort as MM2 already has all this logic implemented; it only needs to use a different client than AdminClient.
    2. MM2 still bypass any capacity safeguards the ecosystem would have in place. 
    3. Any downtime with this tool will cause conflict between MM2 mirrored resources and management resource system.