Status
Current state: Under discussion
JIRA: here
Pull Request: here
Discussing thread: here
Motivation
When the server expands partitions for a topic, the producer firstly perceives the expansion, and some data is written in the newly expanded partitions. But the consumer group perceives the expansion later, after the rebalance is completed, the newly expanded partitions will be consumed from the latest if "auto.offset.reset" is set to "latest". Within a period of time, the data of the newly expanded partitions is skipped and lost by the consumer. But for a group that is consuming, it does not want to skip the data. Therefore, we hope to provide some richer offset reset mechanisms to solve this problem, and secondly, to deal with the problem of out of range more flexibly.
For this case of data loss, I did a test. you can see the linked JIRA for details.
Public Interfaces
Plan is to add 4 enumeration values to OffsetResetStrategy to represent four different strategies.
Then add a new consumer config named "nearest.offset.reset"
Proposed Changes
- besides latest and earliest, we also add nearest: reset to either latest or earliest depending on the current offset, it will only trigger for out-of-range. This parameter is disabled by default. To use it, you need to set "nearest.offset.reset=true". But when there is some partitions without committed offsets that need to reset offsets, one of earliest, latest, and safe_latest must be used together.
- latest-on-start, earliest-on-start: reset to either latest or earliest only when we see the partition for the first time without committed offset; when out-of-range default to none, i.e. throw exception.
- safe-latest: an additional limitTimeStamp parameter is used when reset offset. it means we only reset to latest / earliest if its partition's first record timestamp is smaller / larger than the given limitTimeStamp parameter, otherwise, reset to earliest / latest. set the limitTimeStamp value to the consumer group started timestamp, when new partitions are added it would reset to earliest to avoid losing data for problem mentioned in motivation.
In order to describe in more detail what these parameters mean, and how they behave in various situations. We decide two categories where need reset offset, one is some new partitions without committed offsets, and the other is when an out of range exception is triggered. The following is the behavior of each parameter in these two scenarios.
The first scenario is reset offset without committed offsets:
offset reset strategy | current reset behavior | reset behavior |
none | throw exception | throw exception |
earliest | reset to earliest | reset to earliest |
latest | reset to latest | reset to latest |
earliest_on_start | reset to earliest | |
latest_on_start | reset to latest | |
safe_latest | if group is started newly, reset to latest. if some new partitions is expanded when group is consuming, reset to earliest for these new partitions. | |
nearest | it only effect when trigger for out-of-range, for this scenario, its behavior is determined by the earliest, or latest, or safe_latest used together. |
The other is when an out of range exception is triggered:
offset reset strategy | current reset behavior | reset behavior |
none | throw exception | throw exception |
earliest | reset to earliest | reset to earliest |
latest | reset to latest | reset to latest |
earliest_on_start | throw exception | |
latest_on_start | throw exception | |
safe_latest | reset to latest, consistent with latest. | |
nearest | to the earliest if it was under the range, or to the latest if it was over the range. It has nothing to do with the earliest, or latest, or safe_latest used together. |
Compatibility, Deprecation, and Migration Plan
Existing or old behaviors have no impact. It only provide some rich mechanisms to use, users can choose to use according to their own needs, the existing behaviors will be retained and will not be changed.
Rejected Alternatives
For the problem of losing data due to expand partitions, it is not necessarily set "auto.offset.reset=earliest" for a huge data flow topic when starts up, this will make the group consume historical data from the broker crazily, which will affect the performance of brokers to a certain extent. Therefore, it is necessary to consume these new partitions from the earliest separately, which is "safe_latest".
It has been implemented according to Proposed Changes, see pr: https://github.com/apache/kafka/pull/10726