Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For this case of data loss, I did a test. you can see the linked JIRA for details.

Public Interfaces

  • Plan is to add 3 enumeration values ​​to OffsetResetStrategy to represent four different strategiesAdd an additional enum class named InitialOffsetResetStrategy to represent strategies for group startup with no initial offset.

    Code Block
    languagejava
    firstline1
    titleOffsetResetStrategy.java
    linenumberstrue
    collapsetrue
    package org.apache.kafka.clients.consumer;
    
    import java.util.Locale;
    
    public enum OffsetResetStrategyInitialOffsetResetStrategy {
        LATEST, EARLIEST, NONE, LATEST_ON_START, EARLIEST_ON_START, SAFE_LATEST;
    
        @Override
        public String toString() {
            return super.toString().toLowerCase(Locale.ROOT);
        }
    }


  • Add an additional enum class named OutOfRangeOffsetResetStrategy InvalidOffsetResetStrategy to represent strategies for handling out-of-range.

    Code Block
    languagejava
    firstline1
    titleOutOfRangeOffsetResetStrategy.java
    linenumberstrue
    collapsetrue
    package org.apache.kafka.clients.consumer;
    
    import java.util.Locale;
    
    public enum OutOfRangeOffsetResetStrategyInvalidOffsetResetStrategy {
        NONE, NEAREST;
    
        @Override
        public String toString() {
            return super.toString().toLowerCase(Locale.ROOT);
        }
    }


  • Then add a two new consumer config named "auto.offset.reset.strategy.offset-out-of-range.on.initial.offset" and "auto.offset.reset.on.invalid.offset"

    Code Block
    languagejava
    firstline1
    titleConsumerConfig
    linenumberstrue
    collapsetrue
    public static final String OUTINVALID_OF_RANGE_OFFSET_RESET_CONFIG = "auto.offset.reset.on.strategyinvalid.offset-out-of-range";
    private static final String OUT_OF_RANGEINVALID_OFFSET_RESET_CONFIG_DOC = "If not NONE, when out of range errors will reset the consumer's offset according to strategy. For example of NEAREST, to the earliest end of the broker range if it was under the range, or to the latest end of the broker range if it was over the range. If set NONE, fall back to auto.offset.reset";
    public static final String INITIAL_OFFSET_RESET_CONFIG = "auto.offset.reset.on.initial.offset";
    private static final String INITIAL_OFFSET_RESET_CONFIG_DOC = "If not NONE, when group startup for the first time with no initial offset, it will reset to latest or earliest by LATEST_ON_START and EARLIEST_ON_START, If set NONE, fall back to auto.offset.reset";  
    
    CONFIG = new ConfigDef().define(OUT_OF_RANGE(INVALID_OFFSET_RESET_CONFIG,
                                    Type.STRING,                                 
    								InvalidOffsetResetStrategy.NONE.toString(),
                                    in(Utils.enumOptions(InvalidOffsetResetStrategy.class)),
                                    Importance.MEDIUM,                                 
    								INVALID_OFFSET_RESET_CONFIG_DOC)
    						.define(INITIAL_OFFSET_RESET_CONFIG,
                                    Type.STRING,
                                     
            OutOfRangeOffsetResetStrategy 						InitialOffsetResetStrategy.NONE.toString(),
                                    in(Utils.enumOptions(OutOfRangeOffsetResetStrategyInitialOffsetResetStrategy.class)),
                                    Importance.MEDIUM,
                                     
            OUT_OF_RANGE 						INITIAL_OFFSET_RESET_CONFIG_DOC)


Proposed Changes

  1. Add an auxiliary strategy to deal with out-of-range. If set "offset.reset.strategy.offset-out-of-range=nearest", it will reset to either latest or earliest depending on the current offset, it will only trigger for out-of-range. This parameter is disabled by default. But when there is some partitions without committed offsets that need to reset offsets, one of earliest, latest, and safe_latest must be used together. Strictly speaking, "offset.reset.strategy.offset-out-of-range" is not a level strategy with earlyliest/latest/safe_latest/earliest_on_start/latest_on_start. It is more like an auxiliary strategy, which is only dealt with out-of-range.
  2. latest-on-start, earliest-on-start: reset to either latest or earliest only when we see the partition for the first time without committed offset; when out-of-range default to none, i.e. throw exception.
  3. safe-latest: an additional limitTimeStamp parameter is used when reset offset. it means we only reset to latest / earliest if its partition's first record timestamp is smaller / larger than the given limitTimeStamp parameter, otherwise, reset to earliest / latest. set the limitTimeStamp value to the consumer group started timestamp, when new partitions are added it would reset to earliest to avoid losing data for problem mentioned in motivation.
  1. In addition to the "earliest", "latest", and "none" provided by the existing "auto.offset.reset", it also provides more abundant reset semantics, such as "latest_on_start" (application startup is reset to latest, and an exception is thrown if out of range occurs), "earliest_on_start" (application startup is reset to earliest, and an exception is thrown if out of range occurs), "nearest"(determined by "auto.offset.reset" when the program starts, and choose earliest or latest according to the distance between the current offset and log start offset and log end offset when out of range occurs).
  2. "auto.offset.reset.on.no.initial.offset": Indicates the strategy used to initialize the offset. The default value is the parameter configured by "auto.offset.reset". If so, the strategy for initializing the offset remains unchanged from the previous behavior, ensuring compatibility. If the parameter is configured with "latest_on_start" or "earliest_on_start", then the offset will be reset according to the configured semantics when initializing the offset. In this way, the problem of data loss during partition expansion can be solved: configure "auto.offset.reset.on.no.initial.offset" to "latest_on_start", and configure "auto.offset.reset" to earliest.
  3. "auto.offset.reset.on.invalid.offset": Indicates that the offset is illegal or out of range occurs. The default value is the parameter configured by "auto.offset.reset". If so, the processing of out of range is the same as before to ensure compatibility. If "nearest" is configured, then the semantic logic corresponding to "nearest" is used only for the case of out of range.

The semantics of "auto.offset.reset" remain unchanged. In order to describe in more detail what these parameters mean, and how they behave in various situations. We decide two categories where need reset offset, one is some new partitions without committed offsets, and the other is when an out of range exception is triggered. The following is the behavior of each parameter in these two scenarios.

The first scenario is reset offset without committed offsets:

.

auto.offset.reset.on.no.initial.offset:

offset reset strategycurrent reset behaviorproposed reset behaviornonethrow exceptionthrow exceptionearliestreset to earliestreset to earliestlatestreset to latestreset to latest
initial offset reset strategyproposed reset behavior when set initial offset
none

fall back to *auto.offset.reset*:

if none, throw exception
if earliest, reset to earliest
if latest, reset to latest

earliest_on_start
N/A
reset to earliest
latest_on_start
N/A
reset to latest
safe_latestN/A

if group is started newly, reset to latest.

if some new partitions is expanded when group is consuming, reset to earliest for these new partitions.

nearest(


"auto.offset.reset.

...

on.invalid.offset

...

":

invalid offset reset strategyproposed reset behavior when trigger out of range
none

fall back to *auto.offset.reset*:

if none, throw exception
if earliest, reset to earliest
if latest, reset to latest

nearest

The other is when an out of range exception is triggered:

offset reset strategycurrent reset behaviorproposed reset behaviornonethrow exceptionthrow exceptionearliestreset to earliestreset to earliestlatestreset to latestreset to latestearliest_on_startN/Athrow exceptionlatest_on_startN/Athrow exceptionsafe_latestN/Areset to latest, consistent with latest.nearest(offset.reset.strategy.offset-out-of-range)N/A
to the earliest if it was under the range, or to the latest if it was over the range.
It has nothing to do with the earliest, or latest, or safe_latest used together.

Compatibility, Deprecation, and Migration Plan

...

For the problem of losing data due to expand partitions, it is not necessarily set "auto.offset.reset=earliest" for a huge data flow topic when starts up, this will make the group consume historical data from the broker crazily, which will affect the performance of brokers to a certain extent. Therefore, it is necessary to consume these new partitions from the earliest separately, which is "safe_latestset:"auto.offset.reset.on.no.initial.offset"="latest_on_start", "auto.offset.reset"="earliest".

It has been implemented according to Proposed Changes, see pr: https://github.com/apache/kafka/pull/10726