Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateAcceptedApproved

Discussion thread: here

Voting thread: here

...

Code Block
languagejava
@InterfaceStability.Evolving
public interface ReplicationPolicy extends Configurable {
    ....
    /** Returns heartbeats topic name.*/
    default String heartbeatsTopic() { return "heartbeats";}

    /** Returns the offset-syncs topic for given cluster alias. */
    default String offsetSyncTopicoffsetSyncsTopic(String targetAlias) { return " mm2-offset-syncs." + targetCluster + ".internal";}

    /** Returns the name checkpoint topic for given cluster alias. */
    default String checkpointTopiccheckpointsTopic(String clusterAlias) { return clusterAlias + ".checkpointcheckpoints.internal"; }
    
    /** check if topic is a heartbeat topic, e.g heartbeats, us-west.heartbeats. */
    default boolean isHeartbeatTopicisHeartbeatsTopic(String topic) {
        return heartbeatsTopic().equals(originalTopic(topic));
    }

    /** check if topic is a checkpoint topic. */
    default boolean isCheckpointTopicisCheckpointsTopic(String topic) {return  topic.endsWith(".checkpointcheckpoints.internal")}

    /** Check topic is one of MM2 internal topic, this is used to make sure the topic doesn't need to be replicated.*/
    default boolean isMM2InternalTopic(String topic) {return  topic.endsWith(".internal");}

    /** Internal topics are never replicated. */
    default boolean isInternalTopic(String topic) {
		boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith(".") ||  topic.endsWith("-internal");
        return isMM2InternalTopic(topic) || isKafkaInternalTopic;
    }
}

...

  • heartbeatsTopic()heartbeats
  • checkpointTopiccheckpointsTopic(clusterAlias)→ clusterAlias.checkpoint.internal
  • offsetSyncTopicoffsetSyncsTopic(targetClusterclusterAlias)mm2-offset-syncs.targetClusteclusterAlias.internal
  • isMM2InternalTopic(topic.internal) -> return true if topic name has internal suffix

...

  • When users upgrade an existing MM2 cluster they don’t need to change any of their current configuration If they are using the default replication.policy.separator as this proposal maintains the default behaviour for MM2 with the default configs.
  • If users upgrade an existing MM2 to 3.1.x, 3.2.x, 3.3.x, 3.4.x, or 3.5.x and they are using a customised replication.policy.separator they need to provide a new version of ReplicationPolicy (which can optionally subclass the DefaultReplicationPolicy  class) that overrides the ReplicationPolicy.offsetSyncsTopic  and ReplicationPolicy.checkpointsTopic  methods to use old topics if they still want to use the old internal topics. Related JIRA to this is discussed in  KAFKA-15102.


Rejected Alternatives

1- Add new interface for internal policies, the reason to reject is to minimise the number of MM2 customised classes

...