Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: change name of interface to allow others to move other functionality related to internal topic

...

This KIP proposes adding a new interface named InternalTopicNamingPolicyInternalTopicPolicy that defines how MM2 internal topics are named based on some new configuration.

Code Block
languagejava
/** Defines naming convention for internal topics, checkpoints, offset-syncs and heartbeats. */
@InterfaceStability.Evolving
/** Defines naming convention for internal topics, checkpoints, offset-syncs and heartbeats. */
public interface InternalTopicNamingPolicyInternalTopicPolicy extends Configurable {
    /** Returns heartbeats topic name.*/
    String heartbeatsTopic();

    /** Returns the offset-syncs topic for given cluster alias. */
    String offsetSyncTopic(String targetAlias);

    /** Returns the name checkpoint topic for given cluster alias. */
    String checkpointTopic(String clusterAlias);

    /** check if topic is a checkpoint topic. */
    boolean isCheckpointTopic(String topic);

    /** Check topic is MM2 internal topic, this is used to make sure the topic doesn't need to be replicated.*/
    boolean isMM2InternalTopic(String topic);
}

...

propertydefault valuedescription
internal.topics.policy.classorg.apache.kafka.connect.mirror.DefaultInternalTopicNamingPolicyDefaultInternalTopicPolicyClass which defines the internal topic naming convention.

The DefaultInternalTopicNamingPolicyDefaultInternalTopicPolicy class maintains the current naming convention for MM2 and adds the ability to override the separator for these topics using the following configurations

...

propertydefault valuedescription
internal.topic.policy.separator.Separator used in internal topic naming convention.

By using the DefaultInternalTopicNamingPolicyDefaultInternalTopicPolicy the values will be

  • heartbeatsTopic()heartbeats
  • checkpointTopic(clusterAlias)→ clusterAlias.checkpoint.internal
  • offsetSyncTopic(targetCluster)mm2-offset-syncs.targetCluste.internal

...

Code Block
languagejava
public class MirrorCheckpointTask extends SourceTask {
    ..
    boolean shouldCheckpointTopic(String topic) {
            return topicFilter.shouldReplicateTopic(topic) && !internalTopicNamingPolicyinternalTopicPolicy.isInternalTopic(topic);
        }
}

public class MirrorSourceConnector extends SourceConnector {
    ...
    boolean shouldReplicateTopic(String topic) {
            return (topicFilter.shouldReplicateTopic(topic) || isHeartbeatTopic(topic))
                && !internalTopicNamingPolicyinternalTopicPolicy.isInternalTopic(topic) && !isCycle(topic);
        }
}

...