Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A new consumerRebalanceListener needs to be added to make sure there is no duplicates when consumer rebalance occurs.

Code Block
languagejava
/**
 * This listener is used for execution of tasks defined by user when a consumer rebalance
 * occurs in {@link kafka.consumer.ZookeeperConsumerConnector}
 */
public interface ConsumerRebalanceListener {
    /**
     * This method is called after all the fetcher threads are stopped but before the
     * ownership of partitions are released. Depending on whether auto offset commit is
     * enabled or not, offsets may or may not have been committed.
     * This listener is initially added to prevent duplicate messages on consumer rebalance
     * in mirror maker, where offset auto commit is disabled to prevent data loss. It could
     * also be used in more general cases.
     */
    public void beforeReleasingPartitions(Map<String, Set<Integer>> partitionOwnership);
  /**
   * This method is called after the new partition assignment is finished but before fetcher
   * threads start. A map of new global partition assignment is passed in as parameter.
   * @param consumerIdString The consumer instance invoking this rebalance callback
   * @param partitionAssignment The partition assignment result of current rebalance
   */
    public void beforeStartingFetchers(String consumerIdString, Map<String, Map<Integer, ConsumerThreadId>> partitionAssignment);
}

 

The callback beforeReleasingPartitions will be invoked  on consumer rebalance. It should

...