Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Adding default NOP implementations

...

interface TopicListener extends Configurable, AutoCloseable {
   void topicsChanged(Map<String, String> upstreamToDownstreamTopics);
}
interface GroupListener extends Configurable, AutoCloseable {
void groupsChanged(List<String> replicatedGroups);
}

New default implementations:

  1. DefaultTopicListener (NOP implementation of TopicListener)
  2. DefaultGroupListener (NOP implementation of GroupListener)

New configurations:

  1. MirrorSourceConnector - topic.listener.class (default: nullDefaultTopicListener) - Specifies which class to use as the TopicListener. When null, no listener will be instantiated.
  2. MirrorCheckpointConnector - group.listener.class (default: nullDefaultGroupListener) - Specifies which class to use as the GroupListener. When null, no listener will be instantiated.

Proposed Changes

MirrorSourceConnector

  1. At start, check the instantiate class defined by topic.listener.class config and if defined, instantiate a TopicListener(topicListener).
  2. In the refreshTopicPartitions method, when the list of replicated topics is computed, if a TopicListener is available, notify topicListener with the list map of upstream topic replica topic names.
  3. In the stop method, if a TopicListener is available, close itclose topicListener.

MirrorCheckpointConnector (almost the same changes as in MirrorSourceConnector, but with groups)

  1. At start, check the instantiate class defined by group.listener.class config and if defined, instantiate a GroupListener(groupListener).
  2. In the refreshConsumerGroups method, when the list of groups to checkpoint is computed, if a GroupListener is available, notify groupListener with the list of group names.
  3. In the stop method, if a GroupListener is available, close itclose groupListener.

Compatibility, Deprecation, and Migration Plan

No need for deprecation or migration, as this a feature flagged improvement, adding new capabilities.it adds new capabilities, and the defaults of the new configurations do not change the current behavior. Existing users will not be impacted.

...