Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces and Proposed Changes

This KIP proposes adding a new interface named InternalTopicPolicynew methods to ReplicationPolicy that defines how MM2 internal topics are named based on some new configuration.

Code Block
languagejava
/** Defines naming convention for internal topics, checkpoints, offset-syncs and heartbeats. */
@InterfaceStability.Evolving
/**public Definesinterface namingReplicationPolicy conventionextends forConfigurable internal{
 topics, checkpoints, offset-syncs and heartbeats. */
public interface InternalTopicPolicy extends Configurable {
     ....
    /** Returns heartbeats topic name.*/
    String heartbeatsTopic();

    /** Returns the offset-syncs topic for given cluster alias. */
    String offsetSyncTopic(String targetAlias);

    /** Returns the name checkpoint topic for given cluster alias. */
    String checkpointTopic(String clusterAlias);

    /** check if topic is a checkpoint topic. */
    boolean isCheckpointTopic(String topic);

    /** Check topic is one of MM2 internal topic, this is used to make sure the topic doesn't need to be replicated.*/
    boolean isMM2InternalTopic(String topic);
}

We can provide a custom implementation for this interface by overriding the value of the following configuration

...


    /** Internal topics are never replicated. */
    default boolean isInternalTopic(String topic) {
		boolean isKafkaInternalTopic = topic.startsWith("__") || topic.startsWith(".") ||  topic.endsWith("-internal");
        return isMM2InternalTopic(topic) || isKafkaInternalTopic;
    }
}


...

The DefaultReplicationPolicy class will The DefaultInternalTopicPolicy class maintains the current naming convention for MM2 and adds with the ability to override the separator for these topics using the following configurations

...

using replication.policy.separator

...

which is '.'.

By using the DefaultInternalTopicPolicyDefaulReplicationPolicy the values will be

  • heartbeatsTopic()heartbeats
  • checkpointTopic(clusterAlias)→ clusterAlias.checkpoint.internal
  • offsetSyncTopic(targetCluster)mm2-offset-syncs.targetCluste.internal

Source Connect and Checkpoint Task

In line with this proposal’s motivation we will also need to update MirrorSourceConnector and MirrorCheckpointTaskto ignore replicate or store checkpoints for internal topics.

...

languagejava

...

  • isMM2InternalTopic(topic.internal) -> return true if topic name has internal suffix

Compatibility, Deprecation, and Migration Plan

  • When users upgrade an existing MM2 cluster they don’t need to change any of their current configuration as this proposal maintains the default behaviour for MM2.
  • Customers who have their customised ReplicationPolicy will need to add the definition of their internal topics naming convention

Rejected Alternatives

1- Use the ReplicationPolicy and replication.policy.separator to control the naming convention for internal topics. Reason to reject: Since not all of the internal topics are not replicated it makes more sense to control them from a different interface. Add new interface for internal policies, the reason to reject is to minimise the number of MM2 customised classes

2- Add a configuration for each topic however this would add more fields to MirrorMaker 2 config and the more advanced and complicated is replication design between cluster the more config will be needed if the clusters don't share the same naming convention for topics. Also this would add more complexity to the implementation, as we will need to find a way to identify if topic is checkpoints for MirrorMaker utils and away to identify if topic is internal topic for MirrorCheckpointTask and MirrorSourceConnector to make sure we not track checkpoint or replicate them.