...
This KIP proposes adding a new interface named InternalTopicNamingPolicy
InternalTopicPolicy
that defines how MM2 internal topics are named based on some new configuration.
Code Block | ||
---|---|---|
| ||
/** Defines naming convention for internal topics, checkpoints, offset-syncs and heartbeats. */ @InterfaceStability.Evolving /** Defines naming convention for internal topics, checkpoints, offset-syncs and heartbeats. */ public interface InternalTopicNamingPolicyInternalTopicPolicy extends Configurable { /** Returns heartbeats topic name.*/ String heartbeatsTopic(); /** Returns the offset-syncs topic for given cluster alias. */ String offsetSyncTopic(String targetAlias); /** Returns the name checkpoint topic for given cluster alias. */ String checkpointTopic(String clusterAlias); /** check if topic is a checkpoint topic. */ boolean isCheckpointTopic(String topic); /** Check topic is MM2 internal topic, this is used to make sure the topic doesn't need to be replicated.*/ boolean isMM2InternalTopic(String topic); } |
...
property | default value | description |
---|---|---|
internal.topics.policy.class | org.apache.kafka.connect.mirror.DefaultInternalTopicNamingPolicyDefaultInternalTopicPolicy | Class which defines the internal topic naming convention. |
The DefaultInternalTopicNamingPolicy
DefaultInternalTopicPolicy
class maintains the current naming convention for MM2 and adds the ability to override the separator for these topics using the following configurations
...
property | default value | description |
---|---|---|
internal.topic.policy.separator | . | Separator used in internal topic naming convention. |
By using the DefaultInternalTopicNamingPolicy
DefaultInternalTopicPolicy
the values will be
heartbeatsTopic()
→heartbeats
checkpointTopic(clusterAlias)
→clusterAlias
.checkpoint.internal
offsetSyncTopic(targetCluster)
→mm2-offset-syncs.targetCluste.internal
...
Code Block | ||
---|---|---|
| ||
public class MirrorCheckpointTask extends SourceTask { .. boolean shouldCheckpointTopic(String topic) { return topicFilter.shouldReplicateTopic(topic) && !internalTopicNamingPolicyinternalTopicPolicy.isInternalTopic(topic); } } public class MirrorSourceConnector extends SourceConnector { ... boolean shouldReplicateTopic(String topic) { return (topicFilter.shouldReplicateTopic(topic) || isHeartbeatTopic(topic)) && !internalTopicNamingPolicyinternalTopicPolicy.isInternalTopic(topic) && !isCycle(topic); } } |
...