Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
MM2 internal topic names (heartbeats, checkpoints and offset-syncs) are hardcoded in the source code which makes it hard to run MM2 with any Kafka Cluster that has rules around topic’s naming convention and doesn’t allow auto-creation for topics. In this case you will need to create these internal topics up-front manually and make sure they do follow the cluster rules and guidance for topic creation, so MM2 should have flexibility to let you override the name of internal topics to follow the one you create.
Public Interfaces and Proposed Changes
This KIP proposes adding a new interface named InternalTopicNamingPolicy
that defines how MM2 internal topics are named based on some new configuration.
/** Defines naming convention for internal topics, checkpoints, offset-syncs and heartbeats. */ @InterfaceStability.Evolving /** Defines naming convention for internal topics, checkpoints, offset-syncs and heartbeats. */ public interface InternalTopicNamingPolicy extends Configurable { /** Returns heartbeats topic name.*/ String heartbeatsTopic(); /** Returns the offset-syncs topic for given cluster alias. */ String offsetSyncTopic(String targetAlias); /** Returns the name checkpoint topic for given cluster alias. */ String checkpointTopic(String clusterAlias); /** check if topic is a checkpoint topic. */ boolean isCheckpointTopic(String topic); /** Check topic is MM2 internal topic, this is used to make sure the topic doesn't need to be replicated.*/ boolean isMM2InternalTopic(String topic); }
We can provide a custom implementation for this interface by overriding the value of the following configuration
property | default value | description |
---|---|---|
internal.topics.policy.class | org.apache.kafka.connect.mirror.DefaultInternalTopicNamingPolicy | Class which defines the internal topic naming convention. |
The DefaultInternalTopicNamingPolicy
class maintains the current naming convention for MM2 and adds the ability to override the separator for these topics using the following configurations
property | default value | description |
---|---|---|
internal.topic.policy.separator | . | Separator used in internal topic naming convention. |
By using the DefaultInternalTopicNamingPolicy
the values will be
heartbeatsTopic()
→heartbeats
checkpointTopic(clusterAlias)
→clusterAlias
.checkpoint.internal
offsetSyncTopic(targetCluster)
→mm2-offset-syncs.targetCluste.internal
Source Connect and Checkpoint Task
In line with this proposal’s motivation we will also need to update MirrorSourceConnector
and MirrorCheckpointTask
to ignore replicate or store checkpoints for internal topics.
public class MirrorCheckpointTask extends SourceTask { .. boolean shouldCheckpointTopic(String topic) { return topicFilter.shouldReplicateTopic(topic) && !internalTopicNamingPolicy.isInternalTopic(topic); } } public class MirrorSourceConnector extends SourceConnector { ... boolean shouldReplicateTopic(String topic) { return (topicFilter.shouldReplicateTopic(topic) || isHeartbeatTopic(topic)) && !internalTopicNamingPolicy.isInternalTopic(topic) && !isCycle(topic); } }
Compatibility, Deprecation, and Migration Plan
When users upgrade an existing MM2 cluster they don’t need to change any of their current configuration as this proposal maintains the default behaviour for MM2.
Rejected Alternatives
Use the ReplicationPolicy
and replication.policy.separator
to control the naming convention for internal topics. Reason to reject: Since not all of the internal topics are not replicated it makes more sense to control them from a different interface.