...
Current state: "Under Discussion"
Discussion thread: here
JIRA: KAFKA-14903
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
To address this, MM2 should support a TopicListener (nested in MirrorSourceConnector) and a GroupListener (nested in MirrorCheckpointConnector) plugin, and notify this plugin when the list of replicated topics/groups changes. This will allow users to follow the current set of replicated topics and groups, even with the IdentityReplicationPolicy.
Public Interfaces
New interfaces in :connect:mirror - TopicListener and GroupListener
interface TopicListener extends Configurable, AutoCloseable {
void void topicsChanged(List<String> replicaTopicsMap<String, String> upstreamToDownstreamTopics);
}
interface GroupListener extends Configurable, AutoCloseable {
void groupsChanged(List<String> replicatedGroups);
}
New default implementations:
- DefaultTopicListener (NOP implementation of TopicListener)
- DefaultGroupListener (NOP implementation of GroupListener)
New configurations:
- MirrorSourceConnector - topic.listener.class (default: nullDefaultTopicListener) - Specifies which class to use as the TopicListener. When null, no listener will be instantiated.
- MirrorCheckpointConnector - group.listener.class (default: nullDefaultGroupListener) - Specifies which class to use as the GroupListener. When null, no listener will be instantiated.
Proposed Changes
MirrorSourceConnector
- At start, check the instantiate class defined by topic.listener.class config and if defined, instantiate a TopicListener(topicListener).
- In the refreshTopicPartitions method, when the list of replicated topics is computed, if a TopicListener is available, notify topicListener with the list map of upstream topic - replica topic names.
- In the stop method, if a TopicListener is available, close ittopicListener.
MirrorCheckpointConnector (almost the same changes as in MirrorSourceConnector, but with groups)
- At start, check the instantiate class defined by group.listener.class config and if defined, instantiate a GroupListener(groupListener).
- In the refreshConsumerGroups method, when the list of groups to checkpoint is computed, if a GroupListener is available, notify groupListener with the list of group names.
- In the stop method, if a GroupListener is available, close itclose groupListener.
Compatibility, Deprecation, and Migration Plan
No need for deprecation or migration, as this a feature flagged improvement, adding new capabilities.it adds new capabilities, and the defaults of the new configurations do not change the current behavior. Existing users will not be impacted.
...