Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under discussionAccepted

Discussion thread: here

JIRA: KAFKA-1997

Released: 0.8.3

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • --consumer.rebalance.listener - to allow user to wire in a custom rebalance listener which implements ConusmerRebalanceCallback interface. An internal rebalance listener which implements ConsumerRebalanceCallback is wired in by default to avoid duplicates on consumer rebalance. User could still specify a custom listener class in command line argument. The internal rebalance listener will call that custom listener after it finishes the default logic.
  • --consumer.rebalance.listener.args - to provide arguments to custom rebalance listener constructor.
  • --message.handler - to allow user do some simple process on the messages (e.g. filtering, reformatting, etc)
  • --message.handler.args - to provide arguments for message handler init() function.
  • --abortOnSendFailabort.on.send.fail - define whether mirror maker should abort when a send failed(turned on by default)

Proposed Changes

Image RemovedImage Added

  • Each mirror maker thread consume messages, process them and then send them.
  • Scale by creating more mirror maker thread.

...

  • Mirror maker will only send one request to a broker at any given point.
  • If any exception is caught in mirror maker thread, mirror maker will try to commit the acked offsets then exit immediately.
  • For RetriableException in producer, producer will retry indefinitely. If retry did not work, eventually the entire mirror maker will block on producer buffer full.
  • For None-retriable exception, if --abortOnSendFail abort.on.send.fail is specified, stop the mirror maker. Otherwise producer callback will record the message that was not successfully sent but let the mirror maker move on. In this case, that message will be lost in target cluster.

...

1. call producer.flush

2. commit the offsets

The default rebalance listener does not do anything in beforeStartingFetchers callback. This callback could be useful in some cases, for example, to create a topic in the target cluster with the same number of partitions as in source cluster.

Message handler in consumer thread

...