You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Under Discussion

Discussion thread: TBD

JIRA: KAFKA-7132

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, in Kafka, when a consumer falls out of a consumer group, it will restart processing from the last checkpointed offset. However, this design could result in a lag which some users could not afford to let happen. For example, lets say a consumer crashed at offset 100, with the last checkpointed offset being at 70. When it recovers at a later offset (say, 120), it will be behind by an offset range of 50 (120 - 70). This is because the consumer restarted at 70, forcing it to reprocess old data.

Public Interfaces

This new mode will by default not be used by KafkaConsumer. If the user chooses, he or she can activate this mode.

Proposed Changes

When rebalancing, instead of allowing the current consumer (consumer1) to restart from the last committed offset (70), we should allow it to start processing from the latest offset (120) that could be polled at the time of the consumer's recovery (assuming that the log has grown within the time that the process was down). Meanwhile, a new KafkaConsumer (consumer2) will be created to start processing from the last committed offset (70) and run in concurrency with consumer1. Consumer2 will terminate when it has reached offset 120. By default, this setup will support at-least-once semantics.

Ordering Guarantees

Two threads operating simultaneously on one log would naturally mess up ordering guarantees. To ensure that the offsets will arrive in correct sequence, we would have these two processes write into two distinct buffers.  One of them (buffer2) will contain the offsets from 70-120, while the other (buffer1) would contain the offsets from 120 onwards.

Kafka checkpointing

In this scenario, if auto commit is enabled, we would commit offsets from buffer2 directly into the original commit log. Meanwhile, the offsets found in buffer1 will be checkpointed in another Kafka topic. Once consumer2 has terminated, we would merge both commit logs into a larger one.

Effects on KafkaConsumer#poll()

When the user polls() for new offsets, if consumer2 has not finished, then only offsets from buffer2 will be sent to the user. However, if consumer2 has terminated, then all offsets that has accumulated in buffer1 will also be sent (with buffer2 being discarded, now that it is no longer used).  In this manner, we could also guarantee ordering for the user.


Compatibility, Deprecation, and Migration Plan

No compatibility issues - - only changing the internals of how some of the methods work.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels