You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Status

Current stateUnder Discussion

Discussion thread: here

Voting thread: here

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

MM2 internal topic names (heartbeats, checkpoints and offset-syncs) are hardcoded in the source code which makes it hard to run MM2 with any Kafka Cluster that has rules around topic’s naming convention and doesn’t allow auto-creation for topics. In this case you will need to create these internal topics up-front manually and make sure they do follow the cluster rules and guidance for topic creation, so MM2 should have flexibility to let you override the name of internal topics to follow the one you create.

Public Interfaces and Proposed Changes

This KIP proposes adding new methods to ReplicationPolicy that defines how MM2 internal topics are named based on some new configuration.

@InterfaceStability.Evolving
public interface ReplicationPolicy extends Configurable {
    ....
    /** Returns heartbeats topic name.*/
    default String heartbeatsTopic() { return "heartbeats";}

    /** Returns the offset-syncs topic for given cluster alias. */
    default String offsetSyncTopic(String targetAlias) { return " mm2-offset-syncs." + targetCluster + ".internal";}

    /** Returns the name checkpoint topic for given cluster alias. */
    default String checkpointTopic(String clusterAlias) { return clusterAlias + ".checkpoint.internal"; }

    /** check if topic is a checkpoint topic. */
    boolean isCheckpointTopic(String topic) {return  topic.endsWith(".checkpoint.internal")}

    /** Check topic is one of MM2 internal topic, this is used to make sure the topic doesn't need to be replicated.*/
    boolean isMM2InternalTopic(String topic) {return  topic.endsWith("-internal");}

    /** Internal topics are never replicated. */
    default boolean isInternalTopic(String topic) {
		boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith(".") ||  topic.endsWith("-internal");
        return isMM2InternalTopic(topic) || isKafkaInternalTopic;
    }
}


The default implementation of these methods will keep the current behaviour and We will use the DefaultReplicationPolicy class to add the ability to override the separator for these topics using replication.policy.separator which is '.' by default.

By using the DefaulReplicationPolicy the values will be

  • heartbeatsTopic()heartbeats
  • checkpointTopic(clusterAlias)→ clusterAlias.checkpoint.internal
  • offsetSyncTopic(targetCluster)mm2-offset-syncs.targetCluste.internal
  • isMM2InternalTopic(topic.internal) -> return true if topic name has internal suffix

For MM2 users who use a custom ReplicationPolicy they will not be able to use replication.policy.separator to control internal topics suffix, these users will need to handle these methods if they wish to customise it or use the `replication.policy.separator`


Compatibility, Deprecation, and Migration Plan

  • When users upgrade an existing MM2 cluster they don’t need to change any of their current configuration as this proposal maintains the default behaviour for MM2.

Rejected Alternatives

1- Add new interface for internal policies, the reason to reject is to minimise the number of MM2 customised classes

2- Add a configuration for each topic however this would add more fields to MirrorMaker 2 config and the more advanced and complicated is replication design between cluster the more config will be needed if the clusters don't share the same naming convention for topics. Also this would add more complexity to the implementation, as we will need to find a way to identify if topic is checkpoints for MirrorMaker utils and away to identify if topic is internal topic for MirrorCheckpointTask and MirrorSourceConnector to make sure we not track checkpoint or replicate them. 

  • No labels