Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Plan is to add 4 enumeration values ​​to OffsetResetStrategy to represent four different strategies.

    Code Block
    languagejava
    firstline1
    titleOffsetResetStrategy.java
    linenumberstrue
    collapsetrue
    package org.apache.kafka.clients.consumer;
    
    import java.util.Locale;
    
    public enum OffsetResetStrategy {
        LATEST, EARLIEST, NONE, NEAREST, LATEST_ON_START, EARLIEST_ON_START, SAFE_LATEST;
    
        @Override
        public String toString() {
            return super.toString().toLowerCase(Locale.ROOT);
        }
    }


  • Add an additional enum class named OutOfRangeOffsetResetStrategy to represent strategies for handling out-of-range.

    Code Block
    languagejava
    firstline1
    titleOutOfRangeOffsetResetStrategy.java
    linenumberstrue
    collapsetrue
    package org.apache.kafka.clients.consumer;
    
    import java.util.Locale;
    
    public enum OutOfRangeOffsetResetStrategy {
        NONE, NEAREST;
    
        @Override
        public String toString() {
            return super.toString().toLowerCase(Locale.ROOT);
        }
    }


  • Then add a new consumer config named "nearest.offset.reset.strategy.offset-out-of-range"

    Code Block
    languagejava
    firstline1
    titleConsumerConfig
    linenumberstrue
    collapsetrue
    public static final String NEARESTOUT_OF_RANGE_OFFSET_RESET_CONFIG = "nearest.offset.reset.strategy.offset-out-of-range";
    private static final String NEARESTOUT_OF_RANGE_OFFSET_RESET_CONFIG_DOC = "If not trueNONE, then out of range errors will reset the consumer's offset according to the nearest offset.strategy. For example of NEAREST, to the earliest end of the broker range if it was under the range, or to the latest end of the broker range if it was over the range";
    
    publicCONFIG static= final boolean DEFAULT_NEARESTnew ConfigDef().define(OUT_OF_RANGE_OFFSET_RESET = false;

Proposed Changes

  • _CONFIG,
                                    Type.STRING,
                                    OutOfRangeOffsetResetStrategy.NONE.toString(),
                                    in(Utils.enumOptions(OutOfRangeOffsetResetStrategy.class)),
                                    Importance.MEDIUM,
                                    OUT_OF_RANGE_OFFSET_RESET_CONFIG_DOC)


Proposed Changes

  1. Add an auxiliary strategy to deal with out-of-range. If set "offset.reset.strategy.offset-out-of-range=nearest", it will besides latest and earliest, we also add nearest: reset to either latest or earliest depending on the current offset, it will only trigger for out-of-range. This parameter is disabled by default. To use it, you need to set "nearest.offset.reset=true". But when there is some partitions without committed offsets that need to reset offsets, one of earliest, latest, and safe_latest must be used together. Strictly speaking, nearest "offset.reset.strategy.offset-out-of-range" is not a level strategy with earlyliest/latest/safe_latest/earliest_on_start/latest_on_start. It is more like an auxiliary strategy, which is only dealt with out-of-range.
  2. latest-on-start, earliest-on-start: reset to either latest or earliest only when we see the partition for the first time without committed offset; when out-of-range default to none, i.e. throw exception.
  3. safe-latest: an additional limitTimeStamp parameter is used when reset offset. it means we only reset to latest / earliest if its partition's first record timestamp is smaller / larger than the given limitTimeStamp parameter, otherwise, reset to earliest / latest. set the limitTimeStamp value to the consumer group started timestamp, when new partitions are added it would reset to earliest to avoid losing data for problem mentioned in motivation.

...

The first scenario is reset offset without committed offsets:

offset reset strategycurrent reset behaviorproposed reset behavior
nonethrow exceptionthrow exception
earliestreset to earliestreset to earliest
latestreset to latestreset to latest
earliest_on_startN/Areset to earliest
latest_on_startN/Areset to latest
safe_latestN/A

if group is started newly, reset to latest.

if some new partitions is expanded when group is consuming, reset to earliest for these new partitions.

nearest(offset.reset.strategy.offset-out-of-range)N/Ait only effect when trigger for out-of-range, for this scenario, its behavior is determined by the earliest, or latest, or safe_latest used together.

The other is when an out of range exception is triggered:

offset reset strategycurrent reset behaviorproposed reset behavior
nonethrow exceptionthrow exception
earliestreset to earliestreset to earliest
latestreset to latestreset to latest
earliest_on_startN/Athrow exception
latest_on_startN/Athrow exception
safe_latestN/Areset to latest, consistent with latest.
nearest(offset.reset.strategy.offset-out-of-range)N/Ato the earliest if it was under the range, or to the latest if it was over the range. It has nothing to do with the earliest, or latest, or safe_latest used together.

Compatibility, Deprecation, and Migration Plan

...