Status
Current state: Under Discussion
Discussion thread: TBD
JIRA: KAFKA-7132
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, in Kafka, when a consumer falls out of a consumer group, it will restart processing from the last checkpointed offset. However, this design could result in a lag which some users could not afford to let happen. For example, lets say a consumer crashed at offset 100, with the last checkpointed offset being at 70. When it recovers at a later offset (say, 120), it will be behind by an offset range of 50 (120 - 70). This is because the consumer restarted at 70, forcing it to reprocess old data.
Public Interfaces
In this change, we will introduce a new ConsumerBuffer class which can be queried by the user for offsets.
This new mode will by default not be used by KafkaConsumer. If the user chooses, he or she can activate this mode. Some new methods will also be introduced to KafkaConsumer: for example, a method that notifies the user when a rebalance is in progress. Another method will be introduced which returns the buffers and/or the new KafkaConsumer created.
Names for these methods have not been determined yet.
Proposed Changes
When rebalancing, instead of allowing the current consumer (consumer1) to restart from the last committed offset (70), we should allow it to start processing from the latest offset (120) that could be polled at the time of the consumer's recovery (assuming that the log has grown within the time that the process was down). Meanwhile, a new KafkaConsumer (consumer2) will be created to start processing from the last committed offset (70) and run in concurrency with consumer1. Consumer2 will terminate when it has reached offset 120. By default, this setup will support at-least-once semantics.
However, two threads operating simultaneously on one log would naturally mess up ordering guarantees. To ensure that the offsets will arrive in correct sequence, we would have these two processes write into two distinct buffers (which the user could query). One of them (buffer2) will contain the offsets from 70-120, while the other (buffer1) would contain the offsets from 120 onwards. In this scenario, if auto commit is enabled, we would only commit offsets from buffer2 if consumer2 has not terminated yet. Once consumer2 has terminated, we would start committing from buffer1. In this manner, we could also guarantee ordering for the user.
Compatibility, Deprecation, and Migration Plan
No compatibility issues - - only adding new methods, not changing the return signatures of any preexisting methods.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.