Table of Contents |
---|
Status
Current state: Under DiscussionAdopted
Discussion thread: here
JIRA: here
Motivation
MirrorMaker 2 uses AdminClient
directly to create topics, create new topic partitions and sync configurations like topic configs and ACLs by enabling sync.topic.configs.enabled
and sync.topic.acls.enabled
. The current design runs with 2 main assumptions:
- The user running MM2 has the following ACLs
- `Create` ACLs for topics on the source clusters to create
heartbeat
topics. - `Create` and `Alter/AlterConfigs` ACLs to create topics, create partitions, update topics' config and topics' ACLs on the destination clusters.
- `Create` ACLs for topics on the source clusters to create
- MM2 can bypass any existing Kafka resource management solutions that organizations runs and create/update resources outside this system.
These assumption wouldn't work for any organization that runs any sort of resource management or federated solutions where these systems are usually the only application allowed to initializing a client with `Create` and`Alter/AlterConfigs` ACLs. And no other teams/groups/applications are allowed to have this same level of ACLs to create such a client outside these systems.
...
An example for such a watcher is Strimzi K8S Topic Operator. Which recosiliate Topic and Topic's config between ZK and Topic Resources on K8S however it doesn't have the same solution for UserOperator which make consumer group can't migrate without these ACLs created first. (more details below in The downsides of MM2 use AdminClient directly in the motivation)
The downsides of MM2 use AdminClient directly
As mentioned before the usage of AdminClient directly within MM2 simplify the resource management for MM2. However, it does create the following problems for any users who use IaC (Infra-as-Code), federated solutions, or have a capacity/budget planning system for Kafka destination clusters. Here is a list of potential undesired impact of MM2 bypass the organization ecosystem:
...
This KIP proposes a way for users to run MM2 with custom implementation for the Kafka resource manager in order to easily integrate MM2 with their ecosystem.
Public Interfaces
The KIP proposes adding a new interface called KafkaResourceManager that defines how MM2 will create, modify, and list any Kafka topics and ACLs. MirrorMaker2’s original behaviour will be kept in DefaultResourceManagerflexibility to how MM2 manage Kafka resources. By allowing MM2 to load custom implementation of Admin interface.
To make it easier for users to provide their custom implementation of Admin, the KIP will introduce ForwardingAdmin
class delegate to KafkaAdminClient.
The implemented class can be overridden using the following configurations.
...
forwarding.
...
admin.class
default value will be set to
...
-
ForwardingAdmin with Map<String, Object> config to configure KafkaAdminClient as delegate.
- or can be configured based on cluster aliases using
<cluster_alias>.cluster.forwarding.admin.class
The provided implementation must have a contractor that accept Map<String, Object> config to configure KafkaAdminClient and any customised resource management clients
The configuration for custom resource
...
management client and/or KafkaAdminClient can be passed using the following prefix
admin.*
<cluster_alias>.cluster.admin.*
Proposed Changes
KafkaResourceManager
interface will define how to list, create, update topics, topic configs, and ACLs.
ForwardingAdmin class:
The class must initialize Admin delegate to avoid implementing every method.
Code Block | ||||
---|---|---|---|---|
| ||||
public class ForwardingAdmin implements Admin | ||||
Code Block | ||||
public interface KafkaResourceManager extends Configurable, AutoCloseable { /**private final Admin delegate; * Method to create kafka resource manager * @param props map of configuration needed to setup resource manager * @return KafkaResourceManager */ KafkaResourceManager createpublic ForwardingAdmin(Map<String, Object> propsconfig); { /** * Method to closethis.delegate the manager */ void close(= AdminClient.create(config); /**} * list topics for this cluster @Override public *CreateTopicsResult @return Set of topics name as stringscreateTopics(Collection<NewTopic> newTopics, CreateTopicsOptions options) { * @throws InterruptedException * @throws ExecutionException */ Set<String> listTopics() throws InterruptedException, ExecutionExceptionreturn delegate.createTopics(newTopics, options); } /** *// describeoverride a collectionrest of topics.Admin interface to * @param topics collection of topic names * @return collection of TopicDescription * @throws InterruptedException * @throws ExecutionException */ Collection<TopicDescription> describeTopics(Collection<String> topics) throws InterruptedException, ExecutionException; /** * describe config for given set of topics. * @param topics set of topic names * @return map of topic name and config object * @throws InterruptedException * @throws ExecutionException */ Map<String, Config> describeTopicConfigs(Set<String> topics) throws InterruptedException, ExecutionException; /** * Create new topics and new partitions. * @param newTopics list of new topics * @param newPartitions map of new partitions for each topic */ void createTopicPartitions(List<NewTopic> newTopics, Map<String, NewPartitions> newPartitions); /** * create compacted topic, mostly used to create internal topics. * @param topicName topic name * @param partitions number of partitions * @param replicationFactor number of replicas */ void createCompactedTopic(String topicName, short partitions, short replicationFactor); /** * update config for given topics. * @param topicConfigs map of topics to topic config * @throws InterruptedException * @throws ExecutionException */ void updateTopicConfigs(Map<String, Config> topicConfigs) throws InterruptedException, ExecutionException; /** * list acl binding for all topics. * @return collection AclBinding * @throws InterruptedException * @throws ExecutionException */ Collection<AclBinding> listTopicAclBindings() throws InterruptedException, ExecutionException; /** * update Acls. * @param bindings list of acl bindings * @throws InterruptedException * @throws ExecutionException */ void updateAcls(List<AclBinding> bindings) throws InterruptedException, ExecutionException; } |
...
use delegate...
} |
- Add configuration
forwarding.admin.class
toMirrorConnectorConfig
- Update All MM2 connectors to use the
forwarding.admin.class
Add MirrorConnectorConfig.forwardingAdmin
that will load forwarding.admin.class and return ForwardingAdmin.
All Connectors and Tasks will replace AdminClient.create by
MirrorConnectorConfig.forwardingAdmin
MirrorCheckpointTask
MirrorCheckpointConnector
MirrorSourceConnector
Use ForwardingAdmin in MirrorUtils instead of TopicAdmin to create internal compacted topics
Connector Configuration Properties
Properties common to the SourceConnector(s) and SinkConnector:
property | default value | description | |
---|---|---|---|
forwarding.admin.class | org.apache.kafka.clients.admin.ForwardingAdminClient | The fully qualified name of class that extend ForwardingAdminClient. The class must have a contractor that accept configuration (Map<String, Object> config) to configure KafkaAdminClient and any other needed clients. | |
target.forwarding.admin.class | org.apache.kafka.clients.admin.ForwardingAdminClient | Override | |
source.forwarding.admin.class | org.apache.kafka.clients.admin.ForwardingAdminClient | Override |
In addition, forwarding admin class will be re-using the following existing configs:
property | description |
source.admin.* | overrides for the source cluster forwarding admin config |
target.admin.* | overrides for the target cluster forwarding admin config |
Example Configuration
A sample configuration file ./config/connect-mirror-source.properties is provided for use case where source cluster use default org.apache.kafka.clients.admin.ForwardingAdminClient however target cluster use custom class custom.package.admin.TargetForwardingAdminClient:
Code Block |
---|
source.cluster.alias = A
target.cluster.alias = B
source.cluster.bootstrap.servers = A_localhost:9092
target.cluster.bootstrap.servers = B_localhost:9092
target.cluster.forwarding.admin.class = custom.package.admin.TargetForwardingAdminClient
// Common config for KafkaAdminClient in any ForwardingAdminClient
admin.security.protocol = SASL_SSL
admin.security.protocol=SASL_SSL
admin.sasl.mechanism=PLAIN
// Configure Source org.apache.kafka.clients.admin.ForwardingAdminClient
source.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME1" password="PASSWORD1";
// Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient
target.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME2" password="PASSWORD2";
// Configure Target custom ResourceManagmentRESTClient in custom.mypackage.admin.TargetForwardingAdminClient
target.admin.resource.management.url = https://kafka.resource.manager.com
target.admin.resource.management.keystore.path = /path/keystore
target.admin.resource.management.truststore.path = /path/truststore/ca.pem |
custom.package.admin.TargetForwardingAdminClient looks like this
Code Block | ||
---|---|---|
| ||
package custom.mypackage.admin;
class TargetForwardingAdminClient extends ForwardingAdminClient {
ResourceManagmentRESTClient customResourceManager;
public TargetForwardingAdminClient(Map<String, Object> configs) {
super(config)
customResourceManager = ResourceManagmentRESTClient.create(configs) //method that know how to create client.
}
@Override
public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, CreatePartitionsOptions options) {
// use customResourceManager to updateTopicPartition
}
// ....
} |
MirrorMaker Configuration Properties
The high-level configuration file required by the MirrorMaker driver supports the following properties:
property | default value | description |
<cluster>. | org.apache.kafka.clients.admin.ForwardingAdminClient | The fully qualified name of class that extend ForwardingAdminClient. The class must have a contractor that accept configuration (Map<String, Object> config) to configure needed clients. |
In addition, forwarding admin class will be re-using the following existing configs:
property | description |
<cluster>.admin.* | overrides for the cluster forwarding admin config |
Example Configuration
Code Block |
---|
clusters = primary, backup
primary.bootstrap.servers = A_localhost:9092
backup.bootstrap.servers = B_localhost:9092
// Common config for KafkaAdminClient in any ForwardingAdminClient
admin.security.protocol = SASL_SSL
admin.security.protocol=SASL_SSL
admin.sasl.mechanism=PLAIN
// Configure Primary org.apache.kafka.clients.admin.ForwardingAdminClient
primary.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME1" password="PASSWORD1";
// Configure Target KafkaAdminClient in custom.package.admin.TargetForwardingAdminClient
backup.admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USERNAME2" password="PASSWORD2";
// Configure Target custom ResourceManagmentRESTClient in custom.mypackage.admin.TargetForwardingAdminClient
backup.admin.resource.management.url = https://kafka.resource.manager.com
backup.admin.resource.management.keystore.path = /path/keystore
backup.admin.resource.management.truststore.path = /path/truststore/ca.pem |
Compatibility, Deprecation, and Migration Plan
- When users upgrade an existing MM2 cluster they don’t need to change any of their current configuration as this proposal maintains the default behaviour for MM2.
Rejected Alternatives
Adding a new interface called KafkaResourceManager ("was the original proposal") that defines how MM2 will create, modify, and list any Kafka topics and ACLs. MirrorMaker2’s original behaviour will be kept in DefaultResourceManager.
And override the implementation using the following configurations.
resource.manager.class
default value will be set toDefaultResourceManager
- or can be configured based on cluster aliases using
<cluster_alias>.resource.manager.class
- Manage creating and modifying Kafka topics and ACLs outside MM2 by building a separate tool that monitors the same set of topics as MirrorMaker2 and create/modify topics and ACLs once it detects configuration changes. The downsides with this are
- This will be a duplicate effort as MM2 already has all this logic implemented; it only needs to use a different client than AdminClient.
- MM2 still bypass any capacity safeguards the ecosystem would have in place.
- Any downtime with this tool will cause conflict between MM2 mirrored resources and management resource system.
- This will be a duplicate effort as MM2 already has all this logic implemented; it only needs to use a different client than AdminClient.
...