...
A ConsumerRebalanceListener class is created and could be wired into ZookeeperConsumerConnector to avoid duplicate messages when consumer rebalance occurs in mirror maker.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* This listener is used for execution of tasks defined by user when a consumer rebalance
* occurs in {@link kafka.consumer.ZookeeperConsumerConnector}
*/
public interface ConsumerRebalanceListener {
/**
* This method is called after all the fetcher threads are stopped but before the
* ownership of partitions are released. Depending on whether auto offset commit is
* enabled or not, offsets may or may not have been committed.
* This listener is initially added to prevent duplicate messages on consumer rebalance
* in mirror maker, where offset auto commit is disabled to prevent data loss. It could
* also be used in more general cases.
*/
public void beforeReleasingPartitions(Map<String, Set<Integer>> partitionOwnership);
/**
* This method is called after the new partition assignment is finished but before fetcher
* threads start. A map of new global partition assignment is passed in as parameter.
*/
public void beforeStartingFetchers(Map<String, Map<Integer, ConsumerThreadId>> partitionAssignment);
} |
For Mirror Maker, an internal rebalance listener is wired in by default to avoid duplicates on consumer rebalance. User could still specify a custom listener class in command line argument. The internal rebalance listener will call that custom listener after it finishes the default logic.
...
public
interface
MirrorMakerMessageHandler {
/**
* The message handler will be executed to handle messages in mirror maker
* consumer thread before the messages go into data channel. The message handler should
* not block and should handle all the exceptions by itself.
*/
public
List<MirrorMakerRecord> handle(MirrorMakerRecord record);
}
Changes to make with new consumer
As of now new consumer is still under development. After new consumer is available, the following changes needs to be made:
- Change the message handler interface to take ConsumerRecord for both input and output.
- Change Consumer thread according to new consumer API. The options are:
- Use a single consumer thread. (Preferred unless there is apparent performance issue)
- Use multiple consumer threads by decoupling consumption and processing.
Compatibility, Deprecation, and Migration Plan
...