Table of Contents |
---|
Status
Current state: Under DiscussionAdopted
Discussion thread: here
JIRA: here
...
forwarding.admin.class
default value will be set toForwardingAdmin with Map<String, Object> config to configure KafkaAdminClient as delegate.
- or can be configured based on cluster aliases using
<cluster_alias>.cluster.forwarding.admin.class
The provided implementation must have a contractor that accept Map<String, Object> config to configure KafkaAdminClient and any customised resource management clients
The configuration for custom resource management client and/or KafkaAdminClient can be passed using the following prefix
...
Proposed Changes
ForwardingAdmin class class:
The class must initialize Admin delegate to avoid implementing every method.
Code Block | ||||
---|---|---|---|---|
| ||||
public class ForwardingAdmin implements Admin { private final Admin delegate; public ForwardingAdmin(Map<String, Object> config) { this.delegate = AdminClient.create(config); } @Override public CreateTopicsResult createTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options) { return delegate.createTopics(newTopics, options); } // override rest of Admin interface to use delegate... } |
...
- Add configuration
forwarding.admin.class
toMirrorConnectorConfig
- Update All MM2 connectors to use the
forwarding.admin.class
Add MirrorConnectorConfig.forwardingAdmin
that will load forwarding.admin.class and return ForwardingAdmin.
All Connectors and Tasks will replace AdminClient.create by
MirrorConnectorConfig.forwardingAdmin
MirrorCheckpointTask
MirrorCheckpointConnector
MirrorSourceConnector
Use ForwardingAdmin in MirrorUtils instead of TopicAdmin to create internal compacted topics
Connector Configuration Properties
...
property | default value | description | |
---|---|---|---|
forwarding.admin.class | org.apache.kafka.clients.admin.ForwardingAdminClient | The fully qualified name of class that extend ForwardingAdminClient. The class must have a contractor that accept configuration (Map<String, Object> config) to configure KafkaAdminClient and any other needed clients. | |
target.cluster.forwarding.admin.class | org.apache.kafka.clients.admin.ForwardingAdminClient | Override | |
source.cluster.forwarding.admin.class | org.apache.kafka.clients.admin.ForwardingAdminClient | Override |
...
property | description |
source.cluster.admin.* | overrides for the source - cluster forwarding admin config |
target.cluster.admin.* | overrides for the target - cluster forwarding admin config |
Example Configuration
A sample configuration file ./config/connect-mirror-source.properties is provided for use case where source cluster use default org.apache.kafka.clients.admin.ForwardingAdminClient however target cluster use custom class custom.package.admin.TargetForwardingAdminClient:
Code Block |
---|
source.cluster.alias = A target.cluster.alias = B source.cluster.bootstrap.servers = A_localhost:9092 target.cluster.bootstrap.servers = B_localhost:9092 target.cluster.forwarding.admin.class = custom.package.admin.TargetForwardingAdminClient // Common config for KafkaAdminClient in any ForwardingAdminClient admin.security.protocol = SASL_SSL admin.security.protocol=SASL_SSL admin.sasl.mechanism=PLAIN // Configure Source org.apache.kafka.clients.admin.ForwardingAdminClient source.cluster.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME1" password="PASSWORD1"; // Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient target.cluster.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME2" password="PASSWORD2"; // Configure Target custom ResourceManagmentRESTClient in custom.mypackage.admin.TargetForwardingAdminClient target.cluster.admin.resource.management.url = https://kafka.resource.manager.com target.cluster.admin.resource.management.keystore.path = /path/keystore target.cluster.admin.resource.management.truststore.path = /path/truststore/ca.pem |
...
property | default value | description |
<cluster>. | org.apache.kafka.clients.admin.ForwardingAdminClient | The fully qualified name of class that extend ForwardingAdminClient. The class must have a contractor that accept configuration (Map<String, Object> config) to configure needed clients. |
In addition, forwarding admin class will be re-using the following existing configs:
property | description |
<cluster>.admin.* | overrides for the cluster forwarding admin config |
Example Configuration
Code Block |
---|
clusters = primary, backup primary.bootstrap.servers = A_localhost:9092 backup.bootstrap.servers = B_localhost:9092 // Common config for KafkaAdminClient in any ForwardingAdminClient admin.security.protocol = SASL_SSL admin.security.protocol=SASL_SSL admin.sasl.mechanism=PLAIN // Configure Primary org.apache.kafka.clients.admin.ForwardingAdminClient primary.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME1" password="PASSWORD1"; // Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient backup.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME2" password="PASSWORD2"; // Configure Target custom ResourceManagmentRESTClient in custom.mypackage.admin.TargetForwardingAdminClient backup.admin.resource.management.url = https://kafka.resource.manager.com backup.admin.resource.management.keystore.path = /path/keystore backup.admin.resource.management.truststore.path = /path/truststore/ca.pem |
...