...
JIRA: here
Pull Request: here
Discussing thread: here
Motivation
When the server expands partitions for a topic, the producer firstly perceives the expansion, and some data is written in the newly expanded partitions. But the consumer group perceives the expansion later, after the rebalance is completed, the newly expanded partitions will be consumed from the latest if "auto.offset.reset" is set to "latest". Within a period of time, the data of the newly expanded partitions is skipped and lost by the consumer. But for a group that is consuming, it does not want to skip the data. Therefore, we hope to provide some richer offset reset mechanisms to solve this problem, and secondly, to deal with the problem of out of range more flexibly.
For this case of data loss, I did a test. you can see the linked JIRA for details.
Public Interfaces
Plan is to add 4 enumeration values to OffsetResetStrategy to represent four different strategies.
Code Block language java firstline 1 title OffsetResetStrategy.java linenumbers true collapse true package org.apache.kafka.clients.consumer; import java.util.Locale; public enum OffsetResetStrategy { LATEST, EARLIEST, NONE, NEAREST, LATEST_ON_START, EARLIEST_ON_START, SAFE_LATEST; @Override public String toString() { return super.toString().toLowerCase(Locale.ROOT); } }
Then add a new consumer config named "nearest.offset.reset"
Code Block language java firstline 1 title ConsumerConfig linenumbers true collapse true public static final String NEAREST_OFFSET_RESET_CONFIG = "nearest.offset.reset"; private static final String NEAREST_OFFSET_RESET_DOC = "If true, then out of range errors will reset the consumer's offset to the nearest offset. to the earliest end of the broker range if it was under the range, or to the latest end of the broker range if it was over the range"; public static final boolean DEFAULT_NEAREST_OFFSET_RESET = false;
...