Status

Current state: Under Discuss

JIRA: here 

Pull Request: here

Discussing thread: here

Vote thread: here

Motivation

When the server expands partitions for a topic, the producer firstly perceives the expansion, and some data is written in the newly expanded partitions. But the consumer group perceives the expansion later, after the rebalance is completed, the newly expanded partitions will be consumed from the latest if "auto.offset.reset" is set to "latest". Within a period of time, the data of the newly expanded partitions is skipped and lost by the consumer. But for a group that is consuming, it does not want to skip the data. Therefore, we hope to provide some richer offset reset mechanisms to solve this problem, and secondly, to deal with the problem of out of range more flexibly.

For this case of data loss, I did a test. you can see the linked JIRA for details.

Public Interfaces

Proposed Changes

  1. In addition to the "earliest", "latest", and "none" provided by the existing "auto.offset.reset", it also provides more abundant reset semantics, such as "latest_on_start" (application startup is reset to latest, and an exception is thrown if out of range occurs), "earliest_on_start" (application startup is reset to earliest, and an exception is thrown if out of range occurs), "nearest"(determined by "auto.offset.reset" when the program starts, and choose earliest or latest according to the distance between the current offset and log start offset and log end offset when out of range occurs).
  2. "auto.offset.reset.on.no.initial.offset": Indicates the strategy used to initialize the offset. The default value is the parameter configured by "auto.offset.reset". If so, the strategy for initializing the offset remains unchanged from the previous behavior, ensuring compatibility. If the parameter is configured with "latest_on_start" or "earliest_on_start", then the offset will be reset according to the configured semantics when initializing the offset. In this way, the problem of data loss during partition expansion can be solved: configure "auto.offset.reset.on.no.initial.offset" to "latest_on_start", and configure "auto.offset.reset" to earliest.
  3. "auto.offset.reset.on.invalid.offset": Indicates that the offset is illegal or out of range occurs. The default value is the parameter configured by "auto.offset.reset". If so, the processing of out of range is the same as before to ensure compatibility. If "nearest" is configured, then the semantic logic corresponding to "nearest" is used only for the case of out of range.

The semantics of "auto.offset.reset" remain unchanged. In order to describe in more detail what these parameters mean, and how they behave in various situations. We decide two categories where need reset offset.

"auto.offset.reset.on.no.initial.offset"  (When the group starts consumption for the first time, the offset needs to be initialized):

initial offset reset strategyproposed reset behavior when set initial offset
none

fall back to *auto.offset.reset*:

if none, throw exception
if earliest, reset to earliest
if latest, reset to latest

earliest_on_startreset to earliest
latest_on_startreset to latest


"auto.offset.reset.on.invalid.offset" (When out of range or other abnormal offset inconsistencies occur during consumption):

invalid offset reset strategyproposed reset behavior when trigger out of range
none

fall back to *auto.offset.reset*:

if none, throw exception
if earliest, reset to earliest
if latest, reset to latest

nearestto the earliest if it was under the range, or to the latest if it was over the range.

Compatibility, Deprecation, and Migration Plan

Existing or old behaviors have no impact. It only provide some rich mechanisms to use, users can choose to use according to their own needs, the existing behaviors will be retained and will not be changed.

Rejected Alternatives

For the problem of losing data due to expand partitions, it is not necessarily set "auto.offset.reset=earliest" for a huge data flow topic when starts up, this will make the group consume historical data from the broker crazily, which will affect the performance of brokers to a certain extent. Therefore, it is necessary to consume these new partitions from the earliest separately, which is set:"auto.offset.reset.on.no.initial.offset"="latest_on_start", "auto.offset.reset"="earliest".

It has been implemented according to Proposed Changes, see pr: https://github.com/apache/kafka/pull/10726