Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The proposal in this KIP is to have the consumer behave more like a follower. When a leader is changed, the consumer will first check for truncation using the OffsetForLeaderEpoch API. In order to enable this, we need to keep track of the last epoch that was consumed. If we do not have one (e.g. because the user has seeked to a particular offset or because the message format is older), then the consumer will skip this step. To support this tracking, we will extend the OffsetCommit API to include the leader epoch if one is available.

Leader changes are detected either through a metadata refresh or in response to a FENCED_LEADER_EPOCH error. It is also possible that the consumer sees an UNKNOWN_LEADER_EPOCH in a fetch response if its metadata has gotten ahead of the leader. In this case, it will simply retry the fetch.

This change in behavior has implications for the consumer's offset reset policy, which defines what the consumer should do if its fetch offset becomes out of range. With this KIP, the only case in which this is possible is other than an out of range seek is if the consumer fetches from an offset earlier than the log start offset. By opting into an offset reset policy, the user allows for automatic adjustments to the fetch position, so we take advantage of this to to reset the offset as precisely as possible when log truncation is detected. In some pathological cases (e.g. multiple consecutive unclean leader elections), we may not be able to find the exact offset, but we should be able to get close by finding the starting offset of the next largest epoch that the leader is aware of. We propose in this KIP to change the behavior for both the "earliest" and "latest" reset modes to do this automatically as long as the message format supports lookup by leader epoch. The consumer will log a message to indicate that the truncation was detected, but will reset the position automatically. 

...

Code Block
class ConsumerRecord<K, V> {
	int leaderEpoch();
}

class OffsetAndMetadata {
	int
  /** 
   * Get the leader epoch or empty if it is unknown. 
   */
  Optional<Integer> leaderEpoch();
}

class OffsetAndMetadata {
  /**
   * New constructor including optional leader epoch. Old constructors
   * will still be supported and will use Optional.empty() as the default
   * leader epoch.
   */
  OffsetAndMetadata(long offset, String metadata, Optional<Integer> leaderEpoch); 


  /** 
   * Get the leader epoch or empty if it is unknown. 
   */
  Optional<Integer> leaderEpoch();
}

We will also have a new API to support seeking to an offset and leader epoch. This is required in order to support storage of offsets in an external store. When the consumer is initialized, the user will call seek() with the offset and leader epoch that was stored. The consumer will initialize the position of the consumer using this API.

Like the other seek() overload, this method does not make any remote calls. If a leader epoch has been provided, then in the next call to poll(), the consumer will use the OffsetForLeaderEpoch API to check for truncation. If the log has been truncated since the time the offsets were stored, the next call to poll() will raise a LogTruncationException as described above.

...

We will bump the fetch request version in order to include the last fetched current leader epoch. This will only be provided by the fetcher for consecutive fetches. If the consumer seeks to the middle of the log, for example, then we will use the sentinel value -1 and the leader will skip the epoch validation. The replica should always have the previous epoch except when beginning from the beginning of the logFor older versions, we will skip epoch validation as before.

The new schema is given below:

Code Block
linenumberstrue
FetchRequest => MaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics]
  MaxWaitTime => INT32
  ReplicaId => INT32
  MinBytes => INT32
  IsolationLevel => INT8
  FetchSessionId => INT32
  FetchSessionEpoch => INT32
  Topics => TopicName Partitions
  	TopicName => STRING
  	Partitions => [Partition FetchOffset StartOffset LeaderEpoch MaxBytes]
  		Partition => INT32
  		FetchOffset => INT64
  		StartOffset => INT64
		LeaderEpochCurrentLeaderEpoch => INT32   // New
  		MaxBytes => INT32
  RemovedTopics => RemovedTopicName [RemovedPartition]
  	RemovedTopicName => STRING
  	RemovedPartition => INT32

The response schema will not change, but we will have two new error codes as mentioned above:

  1. FENCED_LEADER_EPOCH: The replica has a lower epoch than the leader. This is retriable.
  2. UNKNOWN_LEADER_EPOCH: This is a retriable error code which indicates that the epoch in the fetch request was larger than any known by the broker.

Previously, we would return the NOT_LEADER_FOR_PARTITION error code if a follower receives an unexpected Fetch request. In this case, the requested epoch is different from what the follower has, so we can now return one of the error codes above, which contain more specific information.

OffsetsForLeaderEpoch

The changes to the OffsetsForLeaderEpoch request API are similar.

Code Block
linenumberstrue
OffsetForLeaderEpochRequest => [Topic]
  Topic => TopicName [Partition] 
    TopicName => STRING
    Partition => PartitionId LeaderEpoch LeaderEpochQuery
      PartitionId => INT32
      LeaderEpochCurrentLeaderEpoch => INT32 // New
      LeaderEpochQueryLeaderEpoch => INT32

If the current leader epoch does not match that of the leader, then we will send either FENCED_REPLICA or UNKNOWN_LEADER_EPOCH as we do for the Fetch API.

...