Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: WIP

JIRA: here 

Pull Request: here

Please keep the discussion on the Jira above.

...

  • Plan is to add 4 enumeration values ​​to OffsetResetStrategy to represent four different strategies.

    Code Block
    languagejava
    firstline1
    titleOffsetResetStrategy.java
    linenumberstrue
    collapsetrue
    package org.apache.kafka.clients.consumer;
    
    import java.util.Locale;
    
    public enum OffsetResetStrategy {
        LATEST, EARLIEST, NONE, NEAREST, LATEST_ON_START, EARLIEST_ON_START, SAFE_LATEST;
    
        @Override
        public String toString() {
            return super.toString().toLowerCase(Locale.ROOT);
        }
    }


  • Then add a new consumer config named "nearest.offset.reset"

    Code Block
    languagejava
    firstline1
    titleConsumerConfig
    linenumberstrue
    collapsetrue
    public static final String NEAREST_OFFSET_RESET_CONFIG = "nearest.offset.reset";
    private static final String NEAREST_OFFSET_RESET_DOC = "If true, then out of range errors will reset the consumer's offset to the nearest offset. to the earliest end of the broker range if it was under the range, or to the latest end of the broker range if it was over the range";
    public static final boolean DEFAULT_NEAREST_OFFSET_RESET = false;


Proposed Changes

  1. besides latest and earliest, we also add nearest: reset to either latest or earliest depending on the current offset, it will only trigger for out-of-range. This parameter is disabled by default. To use it, you need to set "nearest.offset.reset=true". But when there is some partitions without committed offsets that need to reset offsets, one of earliest, latest, and safe_latest must be used together.
  2. latest-on-start, earliest-on-start: reset to either latest or earliest only when we see the partition for the first time without committed offset; when out-of-range default to none, i.e. throw exception.
  3. safe-latest: an additional limitTimeStamp parameter is used when reset offset. it means we only reset to latest / earliest if its partition's first record timestamp is smaller / larger than the given limitTimeStamp parameter, otherwise, reset to earliest / latest. set the limitTimeStamp value to the consumer group started timestamp, when new partitions are added it would reset to earliest to avoid losing data for problem mentioned in motivation.

In order to describe in more detail what these parameters mean, and how they behave in various situations. We decide two categories where need reset offset, one is some new partitions without committed offsets, and the other is when an out of range exception is triggered. The following is the behavior of each parameter in these two scenarios.

...

For the problem of losing data due to expand partitions, it is not necessarily set "auto.offset.reset=earliest" for a huge data flow topic when starts up, this will make the group consume historical data from the broker crazily, which will affect the performance of brokers to a certain extent. Therefore, it is necessary to consume these new partitions from the earliest separately, which is "safe_latest".

It has been implemented according to Proposed Changes, see pr: https://github.com/apache/kafka/pull/10726