Status
Current state: Under discussion
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The Mirror Maker has a potential data loss issue as explained below:
1. Mirror Maker consumer consume some messages and called producer.send(). The messages sit in producer accumulator and haven't been sent yet.
2. Mirror Maker consumer commits the offsets
3. Mirror Maker somehow dies with some messages left in the producer accumulator not yet successfully delivered.
4. When Mirror Maker restarts, all the messages that has been consumed and committed by consumer but not successfully delivered by producer yet will be lost.
The new mirror maker design is trying to solve the above issue as well as enhance the the mirror maker with the following features.
1.Finer control over the target partition.
2.Allow user do some simple process of messages with a wired in message handler.
The new mirror maker is designed based on new consumer.
Public Interfaces
The following interface change will be made to mirror maker.
- --no.data.loss - to automatically apply all the necessary configurations in order to maker sure no data loss.
- --consumer.rebalance.listener - to allow user to wire in a custom rebalance listener which implements ConusmerRebalanceCallback interface. An internal rebalance listener which implements ConsumerRebalanceCallback is wired in by default to avoid duplicates on consumer rebalance. User could still specify a custom listener class in command line argument. The internal rebalance listener will call that custom listener after it finishes the default logic.
- --consumer.rebalance.listener.args - to provide arguments to custom rebalance listener constructor.
- --message.handler - to allow user do some simple process on the messages (e.g. filtering, reformatting, etc)
- --message.handler.args - to provide arguments
- --abortOnSendFail - define whether mirror maker should abort on when a send failed.
Proposed Changes
- Each mirror maker thread consume messages, process them and then send them.
- Scale by creating more mirror maker thread.
Each mirror maker thread has a consumer instance associated with it, the thread will be responsible for decompression, message handling, compression and offset commit. All mirror maker threads share a producer.
The consumer's offsets auto-commits should be turned off, and the offsets will be committed periodically following a flush() call on producer.
The mirror maker thread logic is as below.
while (!shutdown) { val records = consumer.poll(timeout) for (record <- records) { val handledRecords = messageHandler.handle(record) handledRecords.forEach(producer.send) } if (System.currentMilliSeconds - lastOffsetCommitMs > offsetCommitIntervalMs) { producer.flush() consumer.commitOffsets() } }
Offset commit
Each mirror maker thread periodically flush() all the messages in producer and then commit offsets.
No data loss option
A no data loss option is provided with the following settings:
- For consumer: auto.commit.enabled=false
- For producer:
- max.in.flight.requests.per.connection=1
- retries=Int.MaxValue
- acks=-1
- block.on.buffer.full=true
- set --abortOnSendFail
The following actions will be taken by mirror maker:
- Mirror maker will only send one request to a broker at any given point.
- If any exception is caught in producer/consumer thread or OutOfMemory exception is caught in offset commit thread, mirror maker will try to commit the acked offsets then exit immediately.
- For RetriableException in producer, producer will retry indefinitely. If retry did not work, eventually the entire mirror maker will block on producer buffer full.
- For None-retriable exception, if --abortOnSendFail is specified, stop the mirror maker. Otherwise producer callback will record the message that was not successfully sent but let the mirror maker move on. In this case, that message will be lost in target cluster.
On consumer rebalance
A new consumerRebalanceListener needs to be added to make sure there is no duplicates when consumer rebalance occurs. The callback beforeReleasingPartitions will be invoked on consumer rebalance. It should
1. call producer.flush
2. commit the offsets
Message handler in consumer thread
Sometimes it is useful for mirror maker to do some process on the messages it consumed such as filtering out some messages or format conversions. This could be done by adding a message handler in consumer thread. The interface could be
public
interface
MirrorMakerMessageHandler {
/**
* Initialize the custom message handler with passed in arguments. This function will be called when mirror maker instantiate the custom message handler.
*/
public void init(String args);
/**
* The message handler will be executed to handle messages in mirror maker
* thread before the messages are handed to producer. The message handler should
* not block and should handle all the exceptions by itself.
*/
public
List<ProducerRecord> handle(ConsumerRecord record);
}
Compatibility, Deprecation, and Migration Plan
The changes are backward compatible.