THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
A new consumerRebalanceListener needs to be added to make sure there is no duplicates when consumer rebalance occurs.
Code Block | ||
---|---|---|
| ||
/**
* This listener is used for execution of tasks defined by user when a consumer rebalance
* occurs in {@link kafka.consumer.ZookeeperConsumerConnector}
*/
public interface ConsumerRebalanceListener {
/**
* This method is called after all the fetcher threads are stopped but before the
* ownership of partitions are released. Depending on whether auto offset commit is
* enabled or not, offsets may or may not have been committed.
* This listener is initially added to prevent duplicate messages on consumer rebalance
* in mirror maker, where offset auto commit is disabled to prevent data loss. It could
* also be used in more general cases.
*/
public void beforeReleasingPartitions(Map<String, Set<Integer>> partitionOwnership);
/**
* This method is called after the new partition assignment is finished but before fetcher
* threads start. A map of new global partition assignment is passed in as parameter.
* @param consumerIdString The consumer instance invoking this rebalance callback
* @param partitionAssignment The partition assignment result of current rebalance
*/
public void beforeStartingFetchers(String consumerIdString, Map<String, Map<Integer, ConsumerThreadId>> partitionAssignment);
} |
The callback beforeReleasingPartitions will be invoked on consumer rebalance. It should
...