Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under discussionAccepted

Discussion thread: here

JIRA:   KAFKA-1997

Released: 0.8.3

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The following interface change will be made to mirror maker.

  • --no.data.loss- to automatically apply all the necessary configurations in order to maker sure no data loss.--consumer.rebalance.listener - to allow user to wire in a custom rebalance listener which implements ConusmerRebalanceCallback interface. An internal rebalance listener which implements ConsumerRebalanceCallback is wired in by default to avoid duplicates on consumer rebalance. User could still specify a custom listener class in command line argument. The internal rebalance listener will call that custom listener after it finishes the default logic.
  • --consumer.rebalance.listener.args - to provide arguments to custom rebalance listener constructor.
  • --message.handler - to allow user do some simple process on the messages (e.g. filtering, reformatting, etc)
  • --message.handler.args - to provide arguments for message handler init() function.
  • --abortOnSendFailabort.on.send.fail - define whether mirror maker should abort on when a send failed.(turned on by default)

Proposed Changes

Image RemovedImage Added

  • Each mirror maker thread consume messages, process them and then send them.
  • Scale by creating more mirror maker thread.

...

A no data loss option is provided with the following settings, and these are also default settings:

  1. For consumer: auto.commit.enabled=false
  2. For producer:
    1. max.in.flight.requests.per.connection=1
    2. retries=Int.MaxValue
    3. acks=-1
    4. block.on.buffer.full=true
  3. set --abortOnSendFail

The following actions will be taken by mirror maker:

  • Mirror maker will only send one request to a broker at any given point.
  • If any exception is caught in producer/consumer thread or OutOfMemory exception is caught in offset commit threadmirror maker thread, mirror maker will try to commit the acked offsets then exit immediately.
  • For RetriableException in producer, producer will retry indefinitely. If retry did not work, eventually the entire mirror maker will block on producer buffer full.
  • For None-retriable exception, if --abortOnSendFail abort.on.send.fail is specified, stop the mirror maker. Otherwise producer callback will record the message that was not successfully sent but let the mirror maker move on. In this case, that message will be lost in target cluster.

...

A new consumerRebalanceListener needs to be added to make sure there is no duplicates when consumer rebalance occurs.

Code Block
languagejava
/**
 * This listener is used for execution of tasks defined by user when a consumer rebalance
 * occurs in {@link kafka.consumer.ZookeeperConsumerConnector}
 */
public interface ConsumerRebalanceListener {
    /**
     * This method is called after all the fetcher threads are stopped but before the
     * ownership of partitions are released. Depending on whether auto offset commit is
     * enabled or not, offsets may or may not have been committed.
     * This listener is initially added to prevent duplicate messages on consumer rebalance
     * in mirror maker, where offset auto commit is disabled to prevent data loss. It could
     * also be used in more general cases.
     */
    public void beforeReleasingPartitions(Map<String, Set<Integer>> partitionOwnership);
  /**
   * This method is called after the new partition assignment is finished but before fetcher
   * threads start. A map of new global partition assignment is passed in as parameter.
   * @param consumerIdString The consumer instance invoking this rebalance callback
   * @param partitionAssignment The partition assignment result of current rebalance
   */
    public void beforeStartingFetchers(String consumerIdString, Map<String, Map<Integer, ConsumerThreadId>> partitionAssignment);
}

 

The callback beforeReleasingPartitions will be invoked  on consumer rebalance. It should

1. call producer.flush

2. commit the offsets

The default rebalance listener does not do anything in beforeStartingFetchers callback. This callback could be useful in some cases, for example, to create a topic in the target cluster with the same number of partitions as in source cluster.

Message handler in consumer thread

...

public interface MirrorMakerMessageHandler {
    /**
     * Initialize the custom message handler with passed in arguments. This function will be called when mirror maker instantiate the custom message handler.
     */
    public void init(String args);
    /**
     * The message handler will be executed to handle messages in mirror maker
     * thread before the messages are handed to producer. The message handler should
     * not block and should handle all the exceptions by itself.
     */
    public List<ProducerRecord> handle(ConsumerRecord record);
}
The default message handler just return a list with passed in record.

...