Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, in Kafka, when a consumer falls out of a consumer group, it will restart processing from the last checkpointed offset. However, this design could result in a lag which some users could not afford to let happen. For example, lets say a consumer crashed at offset 100, with the last checkpointed offset being at 70. When it recovers at a later offset (say, 120), it will be behind by an offset range of 50 (120 - 70). This is because the consumer restarted at 70, forcing it to reprocess old data.

Public Interfaces

In this change, we will introduce a new ConsumerBuffer class which can be queried by the user for offsets.

This new mode will by default not be used by KafkaConsumer. If the user chooses, he or she can activate this mode. Some new methods will also be introduced to KafkaConsumer: for example, a method that notifies the user when a rebalance is in progress. Another method will be introduced which returns the buffers and/or the new KafkaConsumer created.

Names for these methods have not been determined yet.

Proposed Changes

When rebalancing, instead of allowing the current consumer (consumer1) to restart from the last committed offset (70), we should allow it to start processing from the latest offset (120) that could be polled at the time of the consumer's recovery (assuming that the log has grown within the time that the process was down). Meanwhile, a new KafkaConsumer (consumer2) will be created to start processing from the last committed offset (70) and run in concurrency with consumer1. Consumer2 will terminate when it has reached offset 120. By default, this setup will support at-least-once semantics.

Ordering Guarantees

Two However, two threads operating simultaneously on one log would naturally mess up ordering guarantees. To ensure that the offsets will arrive in correct sequence, we would have these two processes write into two distinct buffers (which the user could query).  One of them (buffer2) will contain the offsets from 70-120, while the other (buffer1) would contain the offsets from 120 onwards.

Kafka checkpointing

In this scenario, if auto commit is enabled, we would only commit offsets from buffer2 directly into the original commit log. Meanwhile, the offsets found in buffer1 will be checkpointed in another Kafka topic. Once consumer2 has terminated, we would merge both commit logs into a larger one.

Effects on KafkaConsumer#poll()

When the user polls() for new offsets, if consumer2 has not terminated yet. Once finished, then only offsets from buffer2 will be sent to the user. However, if consumer2 has terminated, we would start committing from buffer1. then all offsets that has accumulated in buffer1 will also be sent (with buffer2 being discarded, now that it is no longer used).  In this manner, we could also guarantee ordering for the user.

Image AddedImage Removed

Compatibility, Deprecation, and Migration Plan

No compatibility issues - - only adding new methods, not changing the return signatures of any preexisting methodsinternals of how some of the methods work.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.