Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Drop "closest" reset option and reset by timestamp

...

To fix this problem, we propose to include in each fetch request the epoch returned in the last fetch in order to ensure that the fetched offset reflects the expected state of the log from the fetcher's perspective. This epoch is the minimum expected epoch for the message at the fetch offset. The leader will use its leader epoch cache to verify 1) whether the actual epoch is greater than or equal to the epoch in the fetch request, and 2) if the epoch is greater than the minimum expected epoch, and the new epoch does not begin at an earlier validate that the epoch and fetch offset are correct. Specifically, it will ensure that there does not exist a leader epoch which is larger than that the fetch leader epoch, but which starts at a smaller offset than the fetch offset.   In the latter case, the leader can respond with This implies that the consumer did not see some messages from the later epoch, which means the log was truncated at some point. The leader will return a new LOG_TRUNCATION error code to indicate that truncation has occurred. in this case. 

The consumer and the replica would handle this slightly differentlylog truncation in slightly different ways, but basically they would will first try to find the point in the log of divergence. For the consumer, we will either reset the offset if an auto reset policy is configured, or we will raise an error to the user. For the replica, we will truncate to the point of divergence and continue replicating from there.

Below we discuss how the behavior changes for clients, leaders, and followers.

...

This KIP has implications for the consumer's offset reset policy, which defines what the consumer should do if its fetch offset becomes out of range. With this KIP, the only case in which this is possible is if the consumer fetches from an offset earlier than the log start offset. Otherwise, we will detect it either as a log truncation event or an instance of stale metadata (the KIP-232 scenario). By  

By opting into an offset reset policy, the user allows for automatic adjustments to the fetch position, so we take advantage of this to to reset the offset as precisely whenever as possible when log truncation is detected. In some edge scenarios, it pathological cases (e.g. multiple consecutive unclean leader elections), we may not be possible able to find this offset precisely and we still have to resort to the reset policy.

In fact, if we have to reset offsets, we can often do better than "earliest" or "latest" because we have the timestamp index. We propose to add a third option for `auto.offset.reset` to reset the offset to the "closest" offset. If the consumer fetches from an offset lower than the log start offset, we will use the new log start offset. If the offset is larger than the log end offset or if truncation is detected, we will set the reset offset precisely if possible using the logic in this KIP. However, if we fail to find this offset, we will reset using the timestamp of the most recently committed message. Finally, if the message format does not support timestamps, we will use the latest offset. 

The following table summarizes the behavior for the three reset policies:

...

the exact offset, but we should be able to get close by finding the starting offset of the next largest epoch that the leader is aware of. We propose in this KIP to change the behavior for both the "earliest" and "latest" reset modes to do this automatically as long as the message format supports lookup by leader epoch.  

If a user is not using an auto reset option, we will raise a TruncatedPartitionException from poll() when log truncation is detected. This gives users the ability to reset state if needed or revert changes to downstream systems. The exception will include the partitions that were truncated and the offset of divergence as found above. This gives applications the ability to execute any logic to revert changes if needed and rewind the fetch offset. If the user ignores the exception, we will continue fetching from the current offset, but we will drop the last fetched offset metadata from the new FetchRequest so that we do not get the same log truncation error.

...

Code Block
class ConsumerRecord<K, V> {
	int leaderEpoch();
}

class OffsetAndMetadata {
  long timestamp();	
  int leaderEpoch();
}


We will also have a new API to support seeking to an offset and leader epoch. For convenience and consistency, we will reuse the OffsetAndMetadata object.

...

Code Block
linenumberstrue
OffsetCommitRequest => GroupId Generation MemberId [TopicName [Partition Offset Timestamp LeaderEpoch Metadata]]
  GroupId => STRING
  Generation => INT32
  MemberId => STRING
  RetentionTime => INT64
  TopicName => STRING
  Partition => INT32
  Offset => INT64
  Timestamp => INT64    // New
  LeaderEpoch => INT32  // New
  Metadata => STRING

...

Code Block
linenumberstrue
OffsetFetchResponse => [TopicName [Partition Offset Timestamp LeaderEpoch Metadata ErrorCode]]
  TopicName => STRING
  Partition => INT32
  Offset => INT64
  Timestamp => INT64    // New
  LeaderEpoch => INT32  // New
  Metadata => STRING
  ErrorCode => INT16

...