Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

1. Mirror Maker consumer consume some messages and put them into dataChannelcalled producer.send(). The messages sit in producer accumulator and haven't been sent yet.

2. Mirror Maker consumer commits the offsets

3. Mirror Maker somehow dies with some messages left in the data channel or producer cache accumulator not yet successfully delivered by producer.

4. When Mirror Maker restarts, all the messages that has been consumed and committed by consumer but not successfully delivered by producer yet will be lost.

...

For Mirror Maker, an internal rebalance listener which implements ConsumerRebalanceCallback is wired in by default to avoid duplicates on consumer rebalance. User could still specify a custom listener class in command line argument. The internal rebalance listener will call that custom listener after it finishes the default logic.

A --no.data.loss option will be added to mirror maker commandline argument to enforce settings for mirror maker without data loss.

Proposed Changes

  • No data channel, each Each mirror maker thread consume messages, process them and then send them.
  • Scale by creating more mirror maker thread.

Each mirror maker thread has a consumer instance associate associated with it, the thread will be responsible for decompression, message handling, compression and offset commit. All mirror maker threads share a producer.

...

A new consumerRebalanceListener needs to be added to make sure there is no duplicates when consumer rebalance occurs. The callback beforeReleasingPartitions will be invoked  on consumer rebalance. It should

1. clear the dataChannel queue (as those messages will be re-consumed after rebalance)2. wait until the producer drains all remaining messages

32. commit the offsets

Ensure same partition number in source and target topics

...

public interface MirrorMakerMessageHandler {
    /**
     * The message handler will be executed to handle messages in mirror maker
     * consumer thread before the messages go into data channelare handed to producer. The message handler should
     * not block and should handle all the exceptions by itself.
     */
    public List<MirrorMakerRecord> handle(MirrorMakerRecord record);
}
The default message handler just return a list with passed in record.

...