Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

MirrorMaker 2 uses AdminClient  directly to create topics, create new topic partitions and sync configurations like topic configs and ACLs by enabling sync.topic.configs.enabled  and sync.topic.acls.enabled , while . The current approach also assumes that the user running MM2 has the Admin right to create/update topics, which is only valid if the user who runs MM2 also manages both source and destination clusters. 

While this approach simplifies the resource management for MM2, it does create the following problems for any users who use IaC (Infra-as-Code), federated solutions, or have their systems or tooling to manage Kafka resources:

...

a capacity/budget planning system for Kafka destination clusters. 

The impact of MM2 use AdminClient directly out of such ecosystem lead to two main issues

  • Capacity/Budgeting Planning:
    1. MM2 automatically creates topics (breaking the rule of `auto.create.topics.enable=false`) and creates topic partitions on destination clusters if the number of partitions increases on the source for any topic match regex/list of topic  config. 
    2. Sync all topic configs include configurations that impact capacity like `retention.ms` and `retention.bytes`.

These two functionality in MM2 leads to increase desk usage on destination cluster. The team that runs the cluster will only notice the capacity issue when their disk usage hits the threshold for their alerts. 

  •  Provisioning conflict:

MM2 used `AdminClient` directly to perform the following functionality

    • Create a Kafka topic (no way to disable this)
    • Create new Kafka partitions (no way to disable this)
    • Sync Kafka Topic configurations (can be disabled, but then this reduces the value of MM2 potential for users)
    • Sync Kafka topic's ACLs (can be disabled, but this reduces the users' value). Disabling this feature also means that users must ensure they have the right ACLs to the mirrored topics on the destination cluster before switching their consumers, especially when MM2 is used for disaster recovery. It may lead to extra downtime for them.

The usage of AdminClient  directly causes an issue with teams that 

    • Manage their Kafka resources using tools like Strimizi or custom federated solutions. For example, Strimizi's user operator doesn't sync the topic ACLs when MM2 is syncing topic ACLs. Strimzi documentation mentions that users must to disable MM2 sync.topic.acls.enabled  if they use UserOperator . 
    • Teams that run MM2 but don't own the destination cluster. In this case, these teams don't have Admin access, but they may have Kafka management solutions, such as yahoo/CMAK or an in-house solution. For such a tool as CMAK, these teams can update/create resources using CMAK REST API

...

    • .


In use-cases where users have their own centralized system to manage Kafka topics and ACLs, they should easily integrate MirrorMaker2 with their internal ecosystem. 

Currently, there are a couple of few options to deal with these problems, 

  1. Drop mirrored topics from a centralized system and accept the risk of not tracing these topics. This direction will avoid conflict between topics monitored by MM2 and the centralized system for resource management.
  2. Creating mirrored topics up-front on destination clusters and disabling features like sync.topic.configs.enabled, sync.topic.acls.enabled .

...

  1. Run watcher that sync Kafka cluster resource to the in-house ecosystem.


This KIP proposes a way for users to run MM2 with custom implementation for the Kafka resource manager. 

...

Code Block
public interface KafkaResourceManager extends Configurable, AutoCloseable {
    /**     
    * Method to create kafka resource manager
    * @param props map of configuration needed to setup resource manager
    * @return KafkaResourceManager     
    */    
    KafkaResourceManager create(Map<String, Object> props);
    
    /** 
	* Method to close the manager
	*/
    void close();
    
     /**
     * list topics for this cluster
     * @return Set of topics name as strings
     * @throws InterruptedException
     * @throws ExecutionException
     */
    Set<String> listTopics() throws InterruptedException, ExecutionException;
    
    /** 
    * describe a collection of topics. 
    * @param topics collection of topic names 
    * @return collection of TopicDescription 
    * @throws InterruptedException 
    * @throws ExecutionException 
    */
    Collection<TopicDescription> describeTopics(Collection<String> topics) throws InterruptedException, ExecutionException;
    
    /** 
    * describe config for given set of topics.
    * @param topics set of topic names 
    * @return map of topic name and config object 
    * @throws InterruptedException 
    * @throws ExecutionException 
    */
    Map<String, Config> describeTopicConfigs(Set<String> topics) throws InterruptedException, ExecutionException;
    
    /**     
    * Create new topics and new partitions.  
    * @param partitionCounts map of new partitions counts for each topic     
    * @param newTopics list of new topics     
    * @param newPartitions map of new partitions for each topic     
    */    
    void createTopicPartitions(Map<String, Long> partitionCounts, List<NewTopic> newTopics, Map<String, NewPartitions> newPartitions);    
    
    /**     
    * create compacted topic, mostly used to create internal topics.
    * @param topicName topic name     
    * @param partitions number of partitions     
    * @param replicationFactor number of replicas     
    */    
    void createCompactedTopic(String topicName, short partitions, short replicationFactor);    
    
    /**     
    * update config for given topics.    
    * @param topicConfigs map of topics to topic config     
    * @throws InterruptedException     
    * @throws ExecutionException     
    */    
    void updateTopicConfigs(Map<String, Config> topicConfigs) throws InterruptedException, ExecutionException;    
    
    /**
     * list acl binding for all topics.
     * @return collection AclBinding
     * @throws InterruptedException
     * @throws ExecutionException
     */
    Collection<AclBinding> listTopicAclBindings() throws InterruptedException, ExecutionException;
    
    /**     
    * update Acls.    
    * @param bindings list of acl bindings     
    * @throws InterruptedException     
    * @throws ExecutionException     
    */    
    void updateAcls(List<AclBinding> bindings) throws InterruptedException, ExecutionException;
}

...