Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread: here

JIRA: KAFKA-1650, KAFKA-1839, KAFKA-1840 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

3. Mirror Maker somehow dies with some messages left in the dataChannel data channel or producer cache not yet successfully delivered by producer.

4. When Mirror Maker restarts, all the messages that has been consumed and committed by consumer but not delivered by producer yet will be lost.

 

The new mirror maker design is trying to solve the above issue as well as enhance the the mirror maker with the following features.

1.Currently Mirror Maker does not ensure the topics in source and target clusters have same partitions, which is required in many cases.

2.Sometimes it is useful for Mirror Maker to do some simple process on messages (filter, message format change, etc.)

 

The new mirror maker is designed based on new consumer.

Public Interfaces

The no data loss solution uses the callback of new producer to determine the offsets to be committed.

A ConsumerRebalanceListener class is created and could be wired into ZookeeperConsumerConnector to avoid duplicate messages when consumer rebalance occurs in mirror maker.

...

languagejava
titleConsumerRebalanceListener
linenumberstrue

...

committed.

...

 

For Mirror Maker, an internal rebalance listener which implements ConsumerRebalanceCallback is wired in by default to avoid duplicates on consumer rebalance. User could still specify a custom listener class in command line argument. The internal rebalance listener will call that custom listener after it finishes the default logic.

Proposed Changes

...

Proposal 1

Image Added

  • A single consumer thread distribute messages into different data channel queues based on source target partition hash.
  • multiple producer threads take responsibility of handling message and produce them to target cluster.
  • Scale by creating more mirror maker instances or processing threads.

Pros: Decouple consume and process of messages. Allow adjustment on processing/consuming thread ratio.

Cons: More complicated structure than proposal 2.

Proposal 2

Image Added

  • No data channel, each mirror maker thread consume messages, process them and then send them.
  • Scale by creating more mirror maker thread.

Pros: Simple structure.

Cons: consumer and producer are coupled, which is less flexible. More TCP connection and memory footprint.

Common in both proposals

The only source of truth for offsets in consume-then-send pattern is target cluster. The offsets should only be committed after having received the confirmation from the target cluster. For the new producer, the confirmation is the ACK from target cluster.

In that sense the consumer's offsets auto-commits should be turned off, and the offsets will be committed after a future associated with a message is received.

Maintaining message order (KAFKA-1650)

For both keyed and none-keyed messages, the TopicPartition's hash is used to decide which data channel queue they goes to. So the message order in data channel is the same as in source partitions.

Offset commit thread

...

The offsets commit thread should execute periodically. A Map[TopicPartition, UnackedOffsetList] will be needed to track the offsets that is ready for commit.

On consumer rebalance

...

A new consumerRebalanceListener needs to be added to make sure there is no duplicates when consumer rebalance occurs. The callback beforeReleasingPartitions will be invoked  on consumer rebalance. It should

...

2. wait until the producer drains all remaining messages

3. commit the offsets

Ensure same partition number in source and target topics

...

Each consumer will have a local topic cache that records the topics it has ever seen.

When new topic is created in source cluster, consumer rebalance will be triggered. In the ConsumerRebalanceCallback:onPartitionAssigned:

For consumer that consumes partition 0 of a topic:

    1. Check the topic in both source and target cluster to see if it is a new topic.
    2. If new topic is detected, create the topic with the same partition number in target cluster as in source cluster

For consumers that saw topics that does not exist in local topic cache:

    1. Wait until the topic is seen on target cluster.

When a partition expansion occurs, consumer rebalance will also be triggered. Above approach does not take care of this case.Introduce a customRebalanceListener with the callback beforeStartFetchers(newPartitionAssignmentMap). The callback is invoked when consumer rebalance occurs (topic creation/deletion, partition expansion). It will compare the new topic partition info with the existing one to see what change occurs. If a new topic is created, the callback create the topic in target cluster with the same number of partitions. This callback exists in new consumer (onPartitionsAssigned)

Message handler in consumer thread

...

Sometimes it is useful for mirror maker to do some process on the messages it consumed such as filtering out some messages or format conversions. This could be done by adding a message handler in consumer thread. The interface could be

public interface MirrorMakerMessageHandler {
    /**
     * The message handler will be executed to handle messages in mirror maker
     * consumer thread before the messages go into data channel. The message handler should
     * not block and should handle all the exceptions by itself.
     */
    public List<MirrorMakerRecord> List<ConsumerRecord> handle(MirrorMakerRecord ConsumerRecord record);
}
The default message handler just return a list with passed in record.

Changes to make with new consumer

As of now new consumer is still under development. After new consumer is available, the following changes needs to be made:

  1. Change the message handler interface to take ConsumerRecord for both input and output.
  2. Change Consumer thread according to new consumer API.  The options are:
    1. Use a single consumer thread. (Preferred unless there is apparent performance issue)
    2. Use multiple consumer threads by decoupling consumption and processing.
Will submit follow up patch after the new consumer is available
.

Compatibility, Deprecation, and Migration Plan

...