Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A ConsumerRebalanceListener class is created and could be wired into ZookeeperConsumerConnector to avoid duplicate messages when consumer rebalance occurs in mirror maker.

Code Block
languagejava
titleConsumerRebalanceListener
linenumberstrue
/**
 * This listener is used for execution of tasks defined by user when a consumer rebalance
 * occurs in {@link kafka.consumer.ZookeeperConsumerConnector}
 */
public interface ConsumerRebalanceListener {
    /**
     * This method is called after all the fetcher threads are stopped but before the
     * ownership of partitions are released. Depending on whether auto offset commit is
     * enabled or not, offsets may or may not have been committed.
     * This listener is initially added to prevent duplicate messages on consumer rebalance
     * in mirror maker, where offset auto commit is disabled to prevent data loss. It could
     * also be used in more general cases.
     */
    public void beforeReleasingPartitions(Map<String, Set<Integer>> partitionOwnership);
    /**
     * This method is called after the new partition assignment is finished but before fetcher
     * threads start. A map of new global partition assignment is passed in as parameter.
     */
    public void beforeStartingFetchers(Map<String, Map<Integer, ConsumerThreadId>> partitionAssignment);
}

 

For Mirror Maker, an internal rebalance listener is wired in by default to avoid duplicates on consumer rebalance. User could still specify a custom listener class in command line argument. The internal rebalance listener will call that custom listener after it finishes the default logic.

...

public interface MirrorMakerMessageHandler {
    /**
     * The message handler will be executed to handle messages in mirror maker
     * consumer thread before the messages go into data channel. The message handler should
     * not block and should handle all the exceptions by itself.
     */
    public List<MirrorMakerRecord> handle(MirrorMakerRecord record);
}
The default message handler just return a list with passed in record.

Changes to make with new consumer

As of now new consumer is still under development. After new consumer is available, the following changes needs to be made:

  1. Change the message handler interface to take ConsumerRecord for both input and output.
  2. Change Consumer thread according to new consumer API.  The options are:
    1. Use a single consumer thread. (Preferred unless there is apparent performance issue)
    2. Use multiple consumer threads by decoupling consumption and processing.

Compatibility, Deprecation, and Migration Plan

...