Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Add an additional enum class named InitialOffsetResetStrategy to represent strategies for group startup with no initial offset.

    Code Block
    languagejava
    firstline1
    titleOffsetResetStrategyInitialOffsetResetStrategy.java
    linenumberstrue
    collapsetrue
    package org.apache.kafka.clients.consumer;
    
    import java.util.Locale;
    
    public enum InitialOffsetResetStrategy {
        NONE, LATEST_ON_START, EARLIEST_ON_START;
    
        @Override
        public String toString() {
            return super.toString().toLowerCase(Locale.ROOT);
        }
    }


  • Add an additional enum class named InvalidOffsetResetStrategy to represent strategies for handling out-of-range.

    Code Block
    languagejava
    firstline1
    titleOutOfRangeOffsetResetStrategyInvalidOffsetResetStrategy.java
    linenumberstrue
    collapsetrue
    package org.apache.kafka.clients.consumer;
    
    import java.util.Locale;
    
    public enum InvalidOffsetResetStrategy {
        NONE, NEAREST;
    
        @Override
        public String toString() {
            return super.toString().toLowerCase(Locale.ROOT);
        }
    }


  • Then add two new consumer config named "auto.offset.reset.on.initial.offset" and "auto.offset.reset.on.invalid.offset"

    Code Block
    languagejava
    firstline1
    titleConsumerConfig
    linenumberstrue
    collapsetrue
    public static final String INVALID_OFFSET_RESET_CONFIG = "auto.offset.reset.on.invalid.offset";
    private static final String INVALID_OFFSET_RESET_CONFIG_DOC = "If not NONE, when out of range errors will reset the consumer's offset according to strategy. For example of NEAREST, to the earliest end of the broker range if it was under the range, or to the latest end of the broker range if it was over the range. If set NONE, fall back to auto.offset.reset";
    public static final String INITIAL_OFFSET_RESET_CONFIG = "auto.offset.reset.on.initial.offset";
    private static final String INITIAL_OFFSET_RESET_CONFIG_DOC = "If not NONE, when group startup for the first time with no initial offset, it will reset to latest or earliest by LATEST_ON_START and EARLIEST_ON_START, If set NONE, fall back to auto.offset.reset";  
    
    CONFIG = new ConfigDef().define(INVALID_OFFSET_RESET_CONFIG,
                                    Type.STRING,                                 
    								InvalidOffsetResetStrategy.NONE.toString(),
                                    in(Utils.enumOptions(InvalidOffsetResetStrategy.class)),
                                    Importance.MEDIUM,                                 
    								INVALID_OFFSET_RESET_CONFIG_DOC)
    						.define(INITIAL_OFFSET_RESET_CONFIG,
                                    Type.STRING,                                 
            						InitialOffsetResetStrategy.NONE.toString(),
                                    in(Utils.enumOptions(InitialOffsetResetStrategy.class)),
                                    Importance.MEDIUM,                                 
            						INITIAL_OFFSET_RESET_CONFIG_DOC)


...