Status
Current state: Under Discussion
Discussion thread: Here
JIRA: KAFKA-7132
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, in Kafka, when a consumer falls out of a consumer group, it will restart processing from the last checkpointed offset. However, this design could result in a lag which some users could not afford to let happen. For example, lets say a consumer crashed at offset 100, with the last checkpointed offset being at 70. When it recovers at a later offset (say, 120), it will be behind by an offset range of 50 (120 - 70). This is because the consumer restarted at 70, forcing it to reprocess old data.
Public Interfaces
This new mode (name is enable.parallel.rebalance
) will by default not be used by KafkaConsumer (e.g. will be by default equal to false
). If the user chooses, he or she can activate this mode by setting the config's value to true
.
Proposed Changes
When rebalancing, instead of allowing the current consumer (consumer1) to restart from the last committed offset (70), we should allow it to start processing from the latest offset (120) that could be polled at the time of the consumer's recovery (assuming that the log has grown within the time that the process was down). Meanwhile, a new KafkaConsumer (consumer2) will be created to start processing from the last committed offset (70) and run in concurrency with consumer1. Consumer2 will terminate when it has reached offset 120. By default, this setup will support at-least-once semantics.
Ordering Guarantees
Two threads operating simultaneously on one log would naturally mess up ordering guarantees. To ensure that the offsets will arrive in correct sequence, we would have these two processes write into two distinct buffers. One of them (buffer2) will contain the offsets from 70-120, while the other (buffer1) would contain the offsets from 120 onwards.
Kafka checkpointing
In this scenario, if auto commit is enabled, we would commit offsets from buffer2 directly into the original commit log. Meanwhile, the offsets found in buffer1 will be checkpointed in another Kafka topic. When calling KafkaConsumer#committed
, if consumer2 has not terminated, then only offsets checkpointed by consumer2 will be sent. Otherwise, the offsets checkpointed by consumer1 will be returned instead.
Effects on KafkaConsumer#poll()
When the user polls() for new offsets, if consumer2 has not finished, then only offsets from buffer2 will be sent to the user. After each poll(), buffer2 will be emptied of its contents after its records have been returned to the user to free up memory. (Note that if some offsets in buffer2 has not been committed yet, then those offsets will still be kept.) However, if consumer2 has terminated, then all offsets that has accumulated in buffer1 will also be sent (with both buffer2 and buffer1 being discarded, now that both are no longer used). In this manner, we could also guarantee ordering for the user.
Corner Cases
There is a possibility, however remote, that there could be a chained failure. In this case, either consumer1 or consumer2 will fail. If consumer1 were to fail, we could simply repeat our previous policy, and instantiate another thread to process the intervening lag. However, if consumer2 were to fail, no new processes will be created. This is because the offset range that consumer2 is processing is bounded, so this means that consumer2 does not have to deal with a continuous influx of new records – thus no "lag" technically to speak of. Note that if consumer1 crashes N times, N child processes will be created. Checkpointing will write the progress from these N child processes into N distinct logs.
Compatibility, Deprecation, and Migration Plan
No compatibility issues - - only changing the internals of how some of the methods work.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.g