Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAdopted  

Discussion thread: here

JIRA: here

...

Proposed Changes

  • ForwardingAdmin class class: The class must initialize Admin delegate to avoid implementing every method. 


Code Block
languagejava
titleForwardingAdmin.java
public class ForwardingAdmin implements Admin {
    private final Admin delegate;
    
    public ForwardingAdmin(Map<String, Object> config) {
        this.delegate = AdminClient.create(config);
    }

    @Override
    public CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options) {
        return delegate.createTopics(newTopics, options);
    }
    
    // override rest of Admin interface to use delegate...
}

...


  • Add configuration forwarding.admin.class  to MirrorConnectorConfig
  • Update All MM2 connectors to use the forwarding.admin.class
    • Add MirrorConnectorConfig.forwardingAdmin that will load forwarding.admin.class and return ForwardingAdmin. 
    • All Connectors and Tasks will replace AdminClient.create by MirrorConnectorConfig.forwardingAdmin 
      • MirrorCheckpointTask
      • MirrorCheckpointConnector
      • MirrorSourceConnector
    • Use ForwardingAdmin in MirrorUtils instead of TopicAdmin to create internal compacted topics

Connector Configuration Properties

...

property

default value

description


forwarding.admin.class

org.apache.kafka.clients.admin.ForwardingAdminClient

The fully qualified name of class that extend ForwardingAdminClient. The class must have a contractor that accept configuration (Map<String, Object> config) to configure KafkaAdminClient and any other needed clients.
target.cluster.forwarding.admin.classorg.apache.kafka.clients.admin.ForwardingAdminClient

Override forwarding.admin.class only for target cluster


source.cluster.forwarding.admin.classorg.apache.kafka.clients.admin.ForwardingAdminClient

Override forwarding.admin.class only for source cluster


...

property

description

source.cluster.admin.*overrides for the source - cluster forwarding admin config
target.cluster.admin.*overrides for the target - cluster forwarding admin config

Example Configuration

A sample configuration file ./config/connect-mirror-source.properties is provided for use case where source cluster use default org.apache.kafka.clients.admin.ForwardingAdminClient however target cluster use custom class custom.package.admin.TargetForwardingAdminClient:

Code Block
source.cluster.alias = A
target.cluster.alias = B

source.cluster.bootstrap.servers = A_localhost:9092
target.cluster.bootstrap.servers = B_localhost:9092

target.cluster.forwarding.admin.class = custom.package.admin.TargetForwardingAdminClient

// Common config for KafkaAdminClient in any ForwardingAdminClient
admin.security.protocol = SASL_SSL
admin.security.protocol=SASL_SSL
admin.sasl.mechanism=PLAIN

// Configure Source org.apache.kafka.clients.admin.ForwardingAdminClient
source.cluster.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME1" password="PASSWORD1";


// Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient
target.cluster.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME2" password="PASSWORD2";

// Configure Target custom ResourceManagmentRESTClient in custom.mypackage.admin.TargetForwardingAdminClient
target.cluster.admin.resource.management.url = https://kafka.resource.manager.com
target.cluster.admin.resource.management.keystore.path = /path/keystore
target.cluster.admin.resource.management.truststore.path = /path/truststore/ca.pem

...

property

default value

description

<cluster>.forwarding.admin.class

org.apache.kafka.clients.admin.ForwardingAdminClient

The fully qualified name of class that extend ForwardingAdminClient. The class must have a contractor that accept configuration (Map<String, Object> config) to configure needed clients.

In addition, forwarding admin class will be re-using the following existing configs:

property

description

<cluster>.admin.*overrides for the cluster forwarding admin config

Example Configuration

Code Block
clusters = primary, backup
primary.bootstrap.servers = A_localhost:9092
backup.bootstrap.servers = B_localhost:9092

// Common config for KafkaAdminClient in any ForwardingAdminClient
admin.security.protocol = SASL_SSL
admin.security.protocol=SASL_SSL
admin.sasl.mechanism=PLAIN


// Configure Primary org.apache.kafka.clients.admin.ForwardingAdminClient
primary.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME1" password="PASSWORD1";


// Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient
backup.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME2" password="PASSWORD2";

// Configure Target custom ResourceManagmentRESTClient in custom.mypackage.admin.TargetForwardingAdminClient
backup.admin.resource.management.url = https://kafka.resource.manager.com
backup.admin.resource.management.keystore.path = /path/keystore
backup.admin.resource.management.truststore.path = /path/truststore/ca.pem

...