THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Add an additional enum class named InitialOffsetResetStrategy to represent strategies for group startup with no initial offset.
Code Block language java firstline 1 title OffsetResetStrategyInitialOffsetResetStrategy.java linenumbers true collapse true package org.apache.kafka.clients.consumer; import java.util.Locale; public enum InitialOffsetResetStrategy { NONE, LATEST_ON_START, EARLIEST_ON_START; @Override public String toString() { return super.toString().toLowerCase(Locale.ROOT); } }
Add an additional enum class named InvalidOffsetResetStrategy to represent strategies for handling out-of-range.
Code Block language java firstline 1 title OutOfRangeOffsetResetStrategyInvalidOffsetResetStrategy.java linenumbers true collapse true package org.apache.kafka.clients.consumer; import java.util.Locale; public enum InvalidOffsetResetStrategy { NONE, NEAREST; @Override public String toString() { return super.toString().toLowerCase(Locale.ROOT); } }
Then add two new consumer config named "auto.offset.reset.on.initial.offset" and "auto.offset.reset.on.invalid.offset"
Code Block language java firstline 1 title ConsumerConfig linenumbers true collapse true public static final String INVALID_OFFSET_RESET_CONFIG = "auto.offset.reset.on.invalid.offset"; private static final String INVALID_OFFSET_RESET_CONFIG_DOC = "If not NONE, when out of range errors will reset the consumer's offset according to strategy. For example of NEAREST, to the earliest end of the broker range if it was under the range, or to the latest end of the broker range if it was over the range. If set NONE, fall back to auto.offset.reset"; public static final String INITIAL_OFFSET_RESET_CONFIG = "auto.offset.reset.on.initial.offset"; private static final String INITIAL_OFFSET_RESET_CONFIG_DOC = "If not NONE, when group startup for the first time with no initial offset, it will reset to latest or earliest by LATEST_ON_START and EARLIEST_ON_START, If set NONE, fall back to auto.offset.reset"; CONFIG = new ConfigDef().define(INVALID_OFFSET_RESET_CONFIG, Type.STRING, InvalidOffsetResetStrategy.NONE.toString(), in(Utils.enumOptions(InvalidOffsetResetStrategy.class)), Importance.MEDIUM, INVALID_OFFSET_RESET_CONFIG_DOC) .define(INITIAL_OFFSET_RESET_CONFIG, Type.STRING, InitialOffsetResetStrategy.NONE.toString(), in(Utils.enumOptions(InitialOffsetResetStrategy.class)), Importance.MEDIUM, INITIAL_OFFSET_RESET_CONFIG_DOC)
...