Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add new "closest" reset option

...

In order to handle the LOG_TRUNCATION error code, the consumer will need to find the offset at which its log diverged. We propose to use the OffsetForLeaderEpoch API that was introduced in KIP-101. As the consumer is fetching from a partition, it will keep a small cache of the recent epochs that were fetched for each partition. In the case of a log truncation error, it will use this cache to find the point of divergence.

This KIP has implications for the consumer's offset reset policy, which defines what the consumer should do if its fetch offset becomes out of range. With this KIP, the only case in which this is possible is if the consumer fetches from an offset earlier than the log start offset. Otherwise, we will detect it either as a log truncation event or an instance of stale metadata (the KIP-232 scenario). By opting into an offset reset policy, the user allows for automatic adjustments to the fetch position, so we take advantage of this to to reset the offset precisely whenever possible. In some edge scenarios, it may not be possible to find this offset precisely and we still have to resort to the reset policy.

In fact, if we have to reset offsets, we can often do better than "earliest" or "latest" because we have the timestamp index. We propose to add a third option for `auto.offset.reset` to reset the offset to the "closest" offset. If the consumer fetches from an offset lower than the log start offset, we will use the new log start offset. If the offset is larger than the log end offset or if truncation is detected, we will set the reset offset precisely if possible using the logic in this KIP. However, if we fail to find this offset, we will reset using the timestamp of the most recently committed message. Finally, if the message format does not support timestamps, we will use the latest offset. 

The following table summarizes the behavior for the three reset policies:

auto.offset.resetInitializationOffset out of range (too low)Offset out of range (precisely known truncation offset)Offset out of range (unknown truncation offset)
latestlog end offsetlog end offsettruncation offsetlog end offset
earliestlog start offsetlog start offsettruncation offsetlog start offset
closestlog start offsetlog start offsettruncation offsetreset using timestamp if possible, otherwise use log end offset


If a user is not using an auto reset optionIn order to signal the user that truncation has occurred, we will raise a TruncatedPartitionException from poll() when log truncation is detected. This gives users the ability to reset state if needed or revert changes to downstream systems. The exception will include the partitions that were truncated and the offset of divergence as found above. This gives applications the ability to execute any logic to revert changes if needed and rewind the fetch offset. If the user ignores the exception, we will continue fetching from the current offset, but we will drop the last fetched offset metadata from the new FetchRequest so that we do not get the same log truncation error.

...

We will also change the leader behavior so that it is not permitted to add a replica to the ISR if it is marked as offline by the controller. By currently allowing this, we weaken acks=all semantics since the zombie contributes to the min.isr requirement, but is not actually eligible to become leader.

Public Interfaces

Config Changes

As described above, the consumer's `auto.offset.reset` will now support a fourth option to reset offsets to the "closest" position.

API Changes

We will introduce a new exception type, which will be raised from KafkaConsumer.poll(Duration) as described above. This exception extends from `OffsetOutOfRangeException` for compatibility. If a user's offset reset policy is set to "none," they can still catch this 

Code Block
class TruncatedPartitionException extends KafkaExceptionOffsetOutOfRangeException {
	/**
	 * Get a map of the partitions which were truncated and the offset
     * at which the logs diverged from what we read locally. In other
     * words, offsets between this offset and the current fetch offset
     * were truncated from the broker's log. If the consumer was unable
     * to determine the point of divergence, then the value will be 
     * `Optional.empty()`.
	 */
	Map<TopicPartition, Optional<Long>> truncatedOffsets();
}

...

Code Block
class ConsumerRecord<K, V> {
	int leaderEpoch();
}

class OffsetAndMetadata {
  long timestamp();	
  int leaderEpoch();
}


We will also have a new API to support seeking to an offset and leader epoch. For convenience and consistency, we will reuse the OffsetAndMetadata object.

...

Code Block
linenumberstrue
FetchRequest => max_wait_time replica_id min_bytes isolation_level fetch_session_id fetch_session_epoch [topics] [removed_topics]
  max_wait_timeMaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics]
  MaxWaitTime => INT32
  replica_idReplicaId => INT32
  min_bytesMinBytes => INT32
  isolation_levelIsolationLevel => INT8
  fetch_session_idFetchSessionId => INT32
  fetch_session_epochFetchSessionEpoch => INT32
  topicsTopics => topic_name [partition]TopicName Partitions
  	topic_nameTopicName => STRING
  	partitionPartitions => partition_id fetch_offset start_offset max_bytes [Partition FetchOffset StartOffset LeaderEpoch MaxBytes]
  		partition_idPartition => INT32
  		fetch_offsetFetchOffset => INT64
  		start_offsetStartOffset => INT64
		last_fetched_leader_epochLeaderEpoch => INT32   // New
  		max_bytesMaxBytes => INT32
  removed_topicsRemovedTopics => removed_topic_nameRemovedTopicName [removed_partition_idRemovedPartition]
  	removed_topic_nameRemovedTopicName => STRING
  	removed_partition_idRemovedPartition => INT32

The response schema will not change, but we will have two new error codes as mentioned above

...

The new OffsetCommit request schema is provided below. The response schema matches the previous version. We have added fields for the leader epoch and the timestamp.

Code Block
linenumberstrue
OffsetCommitRequest => GroupId Generation MemberId [TopicName [Partition Offset Timestamp LeaderEpoch Metadata]]
  GroupId => STRING
  Generation => INT32
  MemberId => STRING
  RetentionTime => INT64
  TopicName => STRING
  Partition => INT32
  Offset => INT64
  Timestamp => INT64    // New
  LeaderEpoch => INT32  // New
  Metadata => STRING

The OffsetFetch response schema will be similarly modified. The request schema will remain the same.

Code Block
linenumberstrue
OffsetFetchResponse => [TopicName [Partition Offset Timestamp LeaderEpoch Metadata ErrorCode]]
  TopicName => STRING
  Partition => INT32
  Offset => INT64
  Timestamp => INT64    // New
  LeaderEpoch => INT32  // New
  Metadata => STRING
  ErrorCode => INT16

...

Code Block
linenumberstrue
ListOffsetResponse => [TopicName [Partition Offset LeaderEpoch Timestamp ErrorCode]]
  TopicName => STRING
  Partition => INT32
  Offset => INT64
  LeaderEpoch => INT32  // New
  Timestamp => INT64
  ErrorCode => INT16

...