Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Pull Request: here

Discussing thread: here

Vote thread: here

Motivation

When the server expands partitions for a topic, the producer firstly perceives the expansion, and some data is written in the newly expanded partitions. But the consumer group perceives the expansion later, after the rebalance is completed, the newly expanded partitions will be consumed from the latest if "auto.offset.reset" is set to "latest". Within a period of time, the data of the newly expanded partitions is skipped and lost by the consumer. But for a group that is consuming, it does not want to skip the data. Therefore, we hope to provide some richer offset reset mechanisms to solve this problem, and secondly, to deal with the problem of out of range more flexibly.

...

  • Plan is to add 3 enumeration values ​​to OffsetResetStrategy to represent four different strategies.

    Code Block
    languagejava
    firstline1
    titleOffsetResetStrategy.java
    linenumberstrue
    collapsetrue
    package org.apache.kafka.clients.consumer;
    
    import java.util.Locale;
    
    public enum OffsetResetStrategy {
        LATEST, EARLIEST, NONE, LATEST_ON_START, EARLIEST_ON_START, SAFE_LATEST;
    
        @Override
        public String toString() {
            return super.toString().toLowerCase(Locale.ROOT);
        }
    }


  • Add an additional enum class named OutOfRangeOffsetResetStrategy to represent strategies for handling out-of-range.

    Code Block
    languagejava
    firstline1
    titleOutOfRangeOffsetResetStrategy.java
    linenumberstrue
    collapsetrue
    package org.apache.kafka.clients.consumer;
    
    import java.util.Locale;
    
    public enum OutOfRangeOffsetResetStrategy {
        NONE, NEAREST;
    
        @Override
        public String toString() {
            return super.toString().toLowerCase(Locale.ROOT);
        }
    }


  • Then add a new consumer config named "offset.reset.strategy.offset-out-of-range"

    Code Block
    languagejava
    firstline1
    titleConsumerConfig
    linenumberstrue
    collapsetrue
    public static final String OUT_OF_RANGE_OFFSET_RESET_CONFIG = "offset.reset.strategy.offset-out-of-range";
    private static final String OUT_OF_RANGE_OFFSET_RESET_CONFIG_DOC = "If not NONE, thenwhen out of range errors will reset the consumer's offset according to strategy. For example of NEAREST, to the earliest end of the broker range if it was under the range, or to the latest end of the broker range if it was over the range";
    
    CONFIG = new ConfigDef().define(OUT_OF_RANGE_OFFSET_RESET_CONFIG,
                                    Type.STRING,
                                    OutOfRangeOffsetResetStrategy.NONE.toString(),
                                    in(Utils.enumOptions(OutOfRangeOffsetResetStrategy.class)),
                                    Importance.MEDIUM,
                                    OUT_OF_RANGE_OFFSET_RESET_CONFIG_DOC)


...