You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 19 Next »

Status

Current state: Under Discussion

Discussion thread: Here

JIRA: KAFKA-7132

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, in Kafka, when a consumer falls out of a consumer group, it will restart processing from the last checkpointed offset. However, this design could result in a lag which some users could not afford to let happen. For example, lets say a consumer crashed at offset 100, with the last checkpointed offset being at 70. When it recovers at a later offset (say, 120), it will be behind by an offset range of 50 (120 - 70). This is because the consumer restarted at 70, forcing it to reprocess old data. This lag is not acceptable in some computing applications (e.g. traffic control) who needs information by the millisecond, therefore, they will need a faster option for processing the offset lags that results from a crash.

Public Interfaces

This new mode (name is enable.parallel.rebalance) will by default not be used by KafkaConsumer (e.g. will be by default equal to false). If the user chooses, he or she can activate this mode by setting the config's value to true. Note that when using this mode, offsets would no longer be given to the user in correct sequence (or order), but rather in a more haphazard manner. The mentality of this mode could be described as "give me all the offsets as fast as you can, I don't care about order." To illustrate, offsets could be sent to the user in this order:  1,2,3,6,4,7,5,8. Please note that with the enabling of this mode, there will be an increased burden on a computer's processor cores because an extra thread will be instantiated (per current design).

Proposed Changes

When rebalancing, instead of allowing the current consumer (consumer1) to restart from the last committed offset (70), we should allow it to start processing from the latest offset (120) that could be polled at the time of the consumer's recovery (assuming that the log has grown within the time that the process was down). Meanwhile, a new KafkaConsumer (consumer2) will be created to start processing from the last committed offset (70) and run in concurrency with consumer1. Please note that consumer2 will not terminate once it has reached offset 120, as it will continue running. If consumer1 crashes again, the respective lag that results will be taken up by consumer2 for processing. We could think of consumer2 as the safety net which catches any offsets which had slipped by consumer1 when it crashed.

Effects on KafkaConsumer#poll()

When the user polls for offsets, all offsets that has been processed by both consumer1 and consumer2 will be returned to the user. It is guaranteed that all offsets will be received at-least-once, but it will be important to reiterate that offsets will not be returned in order.

Kafka checkpointing

In this scenario, if auto commit is enabled, we would commit offsets from consumer2 and consumer1 directly into the same commit log. Ordering of the checkpointed offsets is not guaranteed. If consumer2 crashes, note that it will start processing from the last offset which it has committed – not consumer1's last committed offset. 

Compatibility, Deprecation, and Migration Plan

No compatibility issues - - only changing the internals of how some of the methods work.

Rejected Alternatives

It has been considered whether or not to allow this mode to support ordering guarantees. After some thought, it was rejected since the implicit costs that comes with it is too high.

For there to be ordering guarantees, it will require the processes involved to either wait for the latest uncommitted offsets to be checkpointed (i.e. all offsets are stored in the same log), or write into distinct logs which could be queried. The former option is too costly in terms of latency, particularly if there is another crash. The latter option is better in terms of latency in that we don't have to wait for other offsets to be fed into the commit log before checkpointing. However, such an approach could lead to an explosion in the number of topics that store the committed offsets. In such a scenario, if there were several crashes within a certain period of time, the number of topics would skyrocket. This is not acceptable.

As for polling(), ordering guarantees could not be supported either, particularly since the process which is consuming the earliest uncheckpointed offsets could be the only thread which returns it results. The other thread(s) involved will not be able to return anything until the first process finished with those offsets.


  • No labels