Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The offsets commit thread should execute periodically. Because the consumer offset is committed per SourcePartition/ConsumerGroup. A Map[TopicPartition, UnackedOffsetList] will be needed to track the offsets that is ready for commit.

Theoretically, we can just keep a Map[SourceTopicPartition/TargetTopicPartition, AckedOffset], but some scenarios, that might not work well. For example, if we have messages 1,2,3,4,5 from a source partition, message 1,2,4,5 are produced to destination partition 1, and message 3 is produced to destination partition 2. If we just use a Map[SourceTopicPartition/TargetTopicPartition, AckedOffset], it would hold { [(SourceTopicPartition=1, TargetTopicParitition=1),5], [(SourceTopicPartition=1, TargetTopicParitition=2),3] }. From the map itself, we cannot tell whether message 4 has been acked or not. And if there is no more message sent to target partition 2, the offset for source partition 1 will block on 3 forever.

The UnackedOffsetList is a raw linked list that keep the messages offsets per source partition in the same order as they are sent by producer. An offset will be removed from the list when its ack is received. The offset commit thread always commit the smallest offset in the list (which is not acked yet).

No data loss option

A no data loss option is provided with the following settings:

  1. For consumer: auto.commit.enabled=false
  2. For producer:
    1. max.in.flight.requests.per.connection=1
    2. retries=Int.MaxValue
    3. acks=-1
    4. block.on.buffer.full=true

The following actions will be taken by mirror maker:

  • Mirror maker will only send one request to a broker at any given point.
  • If any exception is caught in producer/consumer thread or OutOfMemory exception is caught in offset commit thread, mirror maker will try to commit the acked offsets then exit immediately.
  • For RetriableException in producer, producer will retry indefinitely. If retry did not work, eventually the entire mirror maker will block on producer buffer full.
  • For None-retriable exception, producer callback will record the message that was not successfully sent but let the mirror maker move on. In this case, that message will be lost in target cluster.

On consumer rebalance

A new consumerRebalanceListener needs to be added to make sure there is no duplicates when consumer rebalance occurs. The callback beforeReleasingPartitions will be invoked  on consumer rebalance. It should

...