Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • forwarding.admin.class default value will be set to ForwardingAdmin with KafkaAdminClient as delegate.
  • or can be configured based on cluster aliases using <cluster_alias>.cluster.forwarding.admin.class

The configuration for custom resource management client and/or KafkaAdminClient can be passed using the following prefix

  • admin.<any_config>*
  • <cluster_alias>.cluster.admin.<any_config>*

Proposed Changes

  • ForwardingAdmin class 

...

    • All Connectors and Tasks will replace AdminClient.create by MirrorConnectorConfig.forwardingAdmin 
      • MirrorCheckpointTask
      • MirrorCheckpointConnector
      • MirrorSourceConnector
    • Use ForwardingAdmin in MirrorUtils instead of TopicAdmin to create internal compacted topics

Connector Configuration Properties

Properties common to the SourceConnector(s) and SinkConnector:

property

default value

description


forwarding.admin.class

org.apache.kafka.clients.admin.ForwardingAdminClient

The fully qualified name of class that extend ForwardingAdminClient. The class must have a contractor that accept configuration (Map<String, Object> config) to configure needed clients.
target.cluster.forwarding.admin.classorg.apache.kafka.clients.admin.ForwardingAdminClient

Override forwarding.admin.class only for target cluster


source.cluster.forwarding.admin.classorg.apache.kafka.clients.admin.ForwardingAdminClient

Override forwarding.admin.class only for source cluster


In addition, forwarding admin class will be re-using the following existing configs:

property

description

source.cluster.admin.*overrides for the source-cluster forwarding admin
target.cluster.admin.*overrides for the target-cluster forwarding admin

Example Configuration

A sample configuration file ./config/connect-mirror-source.properties is provided for use case where source cluster use default org.apache.kafka.clients.admin.ForwardingAdminClient however target cluster use custom class custom.package.admin.TargetForwardingAdminClient:

Code Block
source.cluster.alias = A
target.cluster.alias = B

source.cluster.bootstrap.servers = A_localhost:9092
target.cluster.bootstrap.servers = B_localhost:9092

target.cluster.forwarding.admin.class = custom.package.admin.TargetForwardingAdminClient

// Common config for KafkaAdminClient in any ForwardingAdminClient
admin.security.protocol = SASL_SSL
admin.security.protocol=SASL_SSL
admin.sasl.mechanism=PLAIN

// Configure Source org.apache.kafka.clients.admin.ForwardingAdminClient
source.cluster.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";


// Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient
target.cluster.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";

// Configure Target custom ResourceManagmentRESTClient in custom.package.admin.TargetForwardingAdminClient
target.cluster.admin.resource.management.url = https://kafka.resource.manager.com
target.cluster.admin.resource.management.keystore.path = /path/keystore
target.cluster.admin.resource.management.truststore.path = /path/truststore/ca.pem

custom.package.admin.TargetForwardingAdminClient looks like this

Code Block
languagejava
package custom.package.admin;

class TargetForwardingAdminClient extends ForwardingAdminClient {
    ResourceManagmentRESTClient customResourceManager;
    public TargetForwardingAdminClient(Map<String, Object> configs) {
    	super(config)

	    customResourceManager = ResourceManagmentRESTClient.create(configs) //method that know how to create client.
    }

    @Override
    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) {
        final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(newPartitions.size());
        newPartitions.entrySet().stream().forEach(entry -> {
            String topicName = entry.getKey();
            int partitionCount = entry.getValue().totalCount();
            customResourceManager.updateTopicPartition(topicName, partitionCount);
            futures.put(topicName, new KafkaFutureImpl<>());
        });
        return new CreatePartitionsResult(new HashMap<>(futures));
    }
   // ....
}

MirrorMaker Configuration Properties

The high-level configuration file required by the MirrorMaker driver supports the following properties:

property

default value

description

<cluster>.forwarding.admin.class

org.apache.kafka.clients.admin.ForwardingAdminClient

The fully qualified name of class that extend ForwardingAdminClient. The class must have a contractor that accept configuration (Map<String, Object> config) to configure needed clients.

Example Configuration

Code Block
clusters = primary, backup
primary.bootstrap.servers = A_localhost:9092
backup.bootstrap.servers = B_localhost:9092

// Common config for KafkaAdminClient in any ForwardingAdminClient
admin.security.protocol = SASL_SSL
admin.security.protocol=SASL_SSL
admin.sasl.mechanism=PLAIN


// Configure Primary org.apache.kafka.clients.admin.ForwardingAdminClient
primary.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";


// Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient
backup.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME" password="PASSWORD";

// Configure Target custom ResourceManagmentRESTClient in custom.package.admin.TargetForwardingAdminClient
backup.admin.resource.management.url = https://kafka.resource.manager.com
backup.admin.resource.management.keystore.path = /path/keystore
backup.admin.resource.management.truststore.path = /path/truststore/ca.pem


Compatibility, Deprecation, and Migration Plan

  • When users upgrade an existing MM2 cluster they don’t need to change any of their current configuration as this proposal maintains the default behaviour for MM2.

...