Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAdopted  

Discussion thread: here

JIRA: here

...

  • forwarding.admin.class default value will be set to ForwardingAdmin with Map<String, Object> config to configure KafkaAdminClient as delegate.
  • or can be configured based on cluster aliases using <cluster_alias>.cluster.forwarding.admin.class
  • The provided implementation must have a contractor that accept Map<String, Object> config to configure KafkaAdminClient and any customised resource management clients

The configuration for custom resource management client and/or KafkaAdminClient can be passed using the following prefix

...

Proposed Changes

  • ForwardingAdmin class class: The class must initialize Admin delegate to avoid implementing every method. 


Code Block
languagejava
titleForwardingAdmin.java
public class ForwardingAdmin implements Admin {
    private final Admin delegate;
    
    public ForwardingAdmin(Map<String, Object> config) {
        this.delegate = AdminClient.create(config);
    }

    @Override
    public CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options) {
        return delegate.createTopics(newTopics, options);
    }
    
    // override rest of Admin interface to use delegate...
}

 


  • Add configuration forwarding.admin.class  to MirrorConnectorConfig
  • Update All MM2 connectors to use the forwarding.admin.class
    • Add MirrorConnectorConfig.forwardingAdmin that will load forwarding.admin.class and return ForwardingAdmin. 
    • All Connectors and Tasks will replace AdminClient.create by MirrorConnectorConfig.forwardingAdmin 
      • MirrorCheckpointTask
      • MirrorCheckpointConnector
      • MirrorSourceConnector
    • Use ForwardingAdmin in MirrorUtils instead of TopicAdmin to create internal compacted topics

Connector Configuration Properties

...

property

default value

description


forwarding.admin.class

org.apache.kafka.clients.admin.ForwardingAdminClient

The fully qualified name of class that extend ForwardingAdminClient. The class must have a contractor that accept configuration (Map<String, Object> config) to configure KafkaAdminClient and any other needed clients.
target.cluster.forwarding.admin.classorg.apache.kafka.clients.admin.ForwardingAdminClient

Override forwarding.admin.class only for target cluster


source.cluster.forwarding.admin.classorg.apache.kafka.clients.admin.ForwardingAdminClient

Override forwarding.admin.class only for source cluster


...

property

description

source.cluster.admin.*overrides for the source - cluster forwarding admin config
target.cluster.admin.*overrides for the target - cluster forwarding admin config

Example Configuration

A sample configuration file ./config/connect-mirror-source.properties is provided for use case where source cluster use default org.apache.kafka.clients.admin.ForwardingAdminClient however target cluster use custom class custom.package.admin.TargetForwardingAdminClient:

Code Block
source.cluster.alias = A
target.cluster.alias = B

source.cluster.bootstrap.servers = A_localhost:9092
target.cluster.bootstrap.servers = B_localhost:9092

target.cluster.forwarding.admin.class = custom.package.admin.TargetForwardingAdminClient

// Common config for KafkaAdminClient in any ForwardingAdminClient
admin.security.protocol = SASL_SSL
admin.security.protocol=SASL_SSL
admin.sasl.mechanism=PLAIN

// Configure Source org.apache.kafka.clients.admin.ForwardingAdminClient
source.cluster.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME1" password="PASSWORD1";


// Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient
target.cluster.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME2" password="PASSWORD2";

// Configure Target custom ResourceManagmentRESTClient in custom.mypackage.admin.TargetForwardingAdminClient
target.cluster.admin.resource.management.url = https://kafka.resource.manager.com
target.cluster.admin.resource.management.keystore.path = /path/keystore
target.cluster.admin.resource.management.truststore.path = /path/truststore/ca.pem

...

Code Block
languagejava
package custom.mypackage.admin;

class TargetForwardingAdminClient extends ForwardingAdminClient {
    ResourceManagmentRESTClient customResourceManager;
    public TargetForwardingAdminClient(Map<String, Object> configs) {
    	super(config)

	    customResourceManager = ResourceManagmentRESTClient.create(configs) //method that know how to create client.
    }

    @Override
    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) {
        final// Map<String,use KafkaFutureImpl<Void>>customResourceManager futures = new HashMap<>(newPartitions.size());
        newPartitions.entrySet().stream().forEach(entry -> {
            String topicName = entry.getKey();
            int partitionCount = entry.getValue().totalCount();
            customResourceManager.updateTopicPartition(topicName, partitionCount);
            futures.put(topicName, new KafkaFutureImpl<>());
        });
        return new CreatePartitionsResult(new HashMap<>(futures));
    }
   to updateTopicPartition
      }
   // ....
}

MirrorMaker Configuration Properties

...

property

default value

description

<cluster>.forwarding.admin.class

org.apache.kafka.clients.admin.ForwardingAdminClient

The fully qualified name of class that extend ForwardingAdminClient. The class must have a contractor that accept configuration (Map<String, Object> config) to configure needed clients.

In addition, forwarding admin class will be re-using the following existing configs:

property

description

<cluster>.admin.*overrides for the cluster forwarding admin config

Example Configuration

Code Block
clusters = primary, backup
primary.bootstrap.servers = A_localhost:9092
backup.bootstrap.servers = B_localhost:9092

// Common config for KafkaAdminClient in any ForwardingAdminClient
admin.security.protocol = SASL_SSL
admin.security.protocol=SASL_SSL
admin.sasl.mechanism=PLAIN


// Configure Primary org.apache.kafka.clients.admin.ForwardingAdminClient
primary.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME1" password="PASSWORD1";


// Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient
backup.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME2" password="PASSWORD2";

// Configure Target custom ResourceManagmentRESTClient in custom.mypackage.admin.TargetForwardingAdminClient
backup.admin.resource.management.url = https://kafka.resource.manager.com
backup.admin.resource.management.keystore.path = /path/keystore
backup.admin.resource.management.truststore.path = /path/truststore/ca.pem

...