...
Public Interfaces
Plan is to add 4 3 enumeration values to OffsetResetStrategy to represent four different strategies.
Code Block language java firstline 1 title OffsetResetStrategy.java linenumbers true collapse true package org.apache.kafka.clients.consumer; import java.util.Locale; public enum OffsetResetStrategy { LATEST, EARLIEST, NONE, LATEST_ON_START, EARLIEST_ON_START, SAFE_LATEST; @Override public String toString() { return super.toString().toLowerCase(Locale.ROOT); } }
Add an additional enum class named OutOfRangeOffsetResetStrategy to represent strategies for handling out-of-range.
Code Block language java firstline 1 title OutOfRangeOffsetResetStrategy.java linenumbers true collapse true package org.apache.kafka.clients.consumer; import java.util.Locale; public enum OutOfRangeOffsetResetStrategy { NONE, NEAREST; @Override public String toString() { return super.toString().toLowerCase(Locale.ROOT); } }
Then add a new consumer config named "offset.reset.strategy.offset-out-of-range"
Code Block language java firstline 1 title ConsumerConfig linenumbers true collapse true public static final String OUT_OF_RANGE_OFFSET_RESET_CONFIG = "offset.reset.strategy.offset-out-of-range"; private static final String OUT_OF_RANGE_OFFSET_RESET_CONFIG_DOC = "If not NONE, then out of range errors will reset the consumer's offset according to strategy. For example of NEAREST, to the earliest end of the broker range if it was under the range, or to the latest end of the broker range if it was over the range"; CONFIG = new ConfigDef().define(OUT_OF_RANGE_OFFSET_RESET_CONFIG, Type.STRING, OutOfRangeOffsetResetStrategy.NONE.toString(), in(Utils.enumOptions(OutOfRangeOffsetResetStrategy.class)), Importance.MEDIUM, OUT_OF_RANGE_OFFSET_RESET_CONFIG_DOC)
...
The first scenario is reset offset without committed offsets:
offset reset strategy | current reset behavior | proposed reset behavior |
none | throw exception | throw exception |
earliest | reset to earliest | reset to earliest |
latest | reset to latest | reset to latest |
earliest_on_start | N/A | reset to earliest |
latest_on_start | N/A | reset to latest |
safe_latest | N/A | if group is started newly, reset to latest. if some new partitions is expanded when group is consuming, reset to earliest for these new partitions. |
nearest(offset.reset.strategy.offset-out-of-range) | N/A | it only effect when trigger for out-of-range, for this scenario, its behavior is determined by the earliest, or latest, or safe_latest used together. |
The other is when an out of range exception is triggered:
offset reset strategy | current reset behavior | proposed reset behavior |
none | throw exception | throw exception |
earliest | reset to earliest | reset to earliest |
latest | reset to latest | reset to latest |
earliest_on_start | N/A | throw exception |
latest_on_start | N/A | throw exception |
safe_latest | N/A | reset to latest, consistent with latest. |
nearest(offset.reset.strategy.offset-out-of-range) | N/A | to the earliest if it was under the range, or to the latest if it was over the range. It has nothing to do with the earliest, or latest, or safe_latest used together. |
Compatibility, Deprecation, and Migration Plan
...