Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under DiscussionAdopted  

Discussion thread: here

JIRA: here

...

To make it easier for users to provide their custom implementation of Admin, the KIP will introduce ForwardingAdmin class delegate to KafkaAdminClient. 

The implemented class can be overridden using the following configurations.

  • forwarding.admin.class default value will be set to ForwardingAdmin with Map<String, Object> config to configure KafkaAdminClient as delegate.
  • or can be configured based on cluster aliases using <cluster_alias>.cluster.forwarding.admin.class
  • The provided implementation must have a contractor that accept Map<String, Object> config to configure KafkaAdminClient and any customised resource management clients

The configuration for custom resource management client and/or KafkaAdminClient can be passed using the following prefix

  • admin.*
  • <cluster_alias>.cluster.admin.*

Proposed Changes

  • ForwardingAdmin class class: The class must initialize Admin delegate to avoid implementing every method. 


Code Block
languagejava
titleForwardingAdmin.java
public class ForwardingAdmin implements Admin {
    private final Admin delegate;
    
    public ForwardingAdmin(Map<String, Object> config, Admin delegate) {
        this.delegate = delegateAdminClient.create(config);
    }

    
    @Override @Override
    public CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options) {
        return delegate.createTopics(newTopics, options);
    }
    
    // override rest of Admin interface to use delegate...
}

...


  • Add configuration forwarding.admin.class  to MirrorConnectorConfig
  • Update All MM2 connectors to use the forwarding.admin.class
    • Add MirrorConnectorConfig.forwardingAdmin that will load forwarding.admin.class and return ForwardingAdmin. 
    • All Connectors and Tasks will replace AdminClient.create by MirrorConnectorConfig.forwardingAdmin 
      • MirrorCheckpointTask
      • MirrorCheckpointConnector
      • MirrorSourceConnector
    • Use ForwardingAdmin in MirrorUtils instead of TopicAdmin to create internal compacted topics

Connector Configuration Properties

Properties common to the SourceConnector(s) and SinkConnector:

property

default value

description


forwarding.admin.class

org.apache.kafka.clients.admin.ForwardingAdminClient

The fully qualified name of class that extend ForwardingAdminClient. The class must have a contractor that accept configuration (Map<String, Object> config) to configure KafkaAdminClient and any other needed clients.
target.forwarding.admin.classorg.apache.kafka.clients.admin.ForwardingAdminClient

Override forwarding.admin.class only for target cluster


source.forwarding.admin.classorg.apache.kafka.clients.admin.ForwardingAdminClient

Override forwarding.admin.class only for source cluster


In addition, forwarding admin class will be re-using the following existing configs:

property

description

source.admin.*overrides for the source cluster forwarding admin config
target.admin.*overrides for the target cluster forwarding admin config

Example Configuration

A sample configuration file ./config/connect-mirror-source.properties is provided for use case where source cluster use default org.apache.kafka.clients.admin.ForwardingAdminClient however target cluster use custom class custom.package.admin.TargetForwardingAdminClient:

Code Block
source.cluster.alias = A
target.cluster.alias = B

source.cluster.bootstrap.servers = A_localhost:9092
target.cluster.bootstrap.servers = B_localhost:9092

target.cluster.forwarding.admin.class = custom.package.admin.TargetForwardingAdminClient

// Common config for KafkaAdminClient in any ForwardingAdminClient
admin.security.protocol = SASL_SSL
admin.security.protocol=SASL_SSL
admin.sasl.mechanism=PLAIN

// Configure Source org.apache.kafka.clients.admin.ForwardingAdminClient
source.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME1" password="PASSWORD1";


// Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient
target.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME2" password="PASSWORD2";

// Configure Target custom ResourceManagmentRESTClient in custom.mypackage.admin.TargetForwardingAdminClient
target.admin.resource.management.url = https://kafka.resource.manager.com
target.admin.resource.management.keystore.path = /path/keystore
target.admin.resource.management.truststore.path = /path/truststore/ca.pem

custom.package.admin.TargetForwardingAdminClient looks like this

Code Block
languagejava
package custom.mypackage.admin;

class TargetForwardingAdminClient extends ForwardingAdminClient {
    ResourceManagmentRESTClient customResourceManager;
    public TargetForwardingAdminClient(Map<String, Object> configs) {
    	super(config)

	    customResourceManager = ResourceManagmentRESTClient.create(configs) //method that know how to create client.
    }

    @Override
    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) {
        // use customResourceManager to updateTopicPartition
      }
   // ....
}

MirrorMaker Configuration Properties

The high-level configuration file required by the MirrorMaker driver supports the following properties:

property

default value

description

<cluster>.forwarding.admin.class

org.apache.kafka.clients.admin.ForwardingAdminClient

The fully qualified name of class that extend ForwardingAdminClient. The class must have a contractor that accept configuration (Map<String, Object> config) to configure needed clients.

In addition, forwarding admin class will be re-using the following existing configs:

property

description

<cluster>.admin.*overrides for the cluster forwarding admin config

Example Configuration

Code Block
clusters = primary, backup
primary.bootstrap.servers = A_localhost:9092
backup.bootstrap.servers = B_localhost:9092

// Common config for KafkaAdminClient in any ForwardingAdminClient
admin.security.protocol = SASL_SSL
admin.security.protocol=SASL_SSL
admin.sasl.mechanism=PLAIN


// Configure Primary org.apache.kafka.clients.admin.ForwardingAdminClient
primary.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME1" password="PASSWORD1";


// Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient
backup.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME2" password="PASSWORD2";

// Configure Target custom ResourceManagmentRESTClient in custom.mypackage.admin.TargetForwardingAdminClient
backup.admin.resource.management.url = https://kafka.resource.manager.com
backup.admin.resource.management.keystore.path = /path/keystore
backup.admin.resource.management.truststore.path = /path/truststore/ca.pem


Compatibility, Deprecation, and Migration Plan

  • When users upgrade an existing MM2 cluster they don’t need to change any of their current configuration as this proposal maintains the default behaviour for MM2.

...