Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The proposal in this KIP is to have the consumer behave more like a follower. The consumer will obtain the current leader epoch using the Metadata API. When fetching from a new leader is changed, the consumer will first check for truncation using the OffsetForLeaderEpoch API. In order to enable this, we need to keep track of the last epoch that was consumed. If we do not have one (e.g. because the user has seeked to a particular offset or because the message format is older), then the consumer will skip this step. To support this tracking, we will extend the OffsetCommit API to include the leader epoch if one is available.

Leader changes are detected either through a metadata refresh or in response to a FENCED_LEADER_EPOCH error. It is also possible that the consumer sees an UNKNOWN_LEADER_EPOCH in a fetch response if its metadata has gotten ahead of the leader. In this case, it will simply retry the fetch. 

This change in behavior has implications for the consumer's offset reset policy, which defines what the consumer should do if its fetch offset becomes out of range. With this KIP, the only case in which this is possible other than an out of range seek is if the consumer fetches from an offset earlier than the log start offset. By opting into an offset reset policy, the user allows for automatic adjustments to the fetch position, so we take advantage of this to to reset the offset as precisely as possible when log truncation is detected. In some pathological cases (e.g. multiple consecutive unclean leader elections), we may not be able to find the exact offset, but we should be able to get close by finding the starting offset of the next largest epoch that the leader is aware of. We propose in this KIP to change the behavior for both the "earliest" and "latest" reset modes to do this automatically as long as the message format supports lookup by leader epoch. The consumer will log a message to indicate that the truncation was detected, but will reset the position automatically. 

...

For consumers, we propose some additional extensions:

  1. To fix the problem with KIP-232When the consumer needs to reset offsets, it uses the ListOffsets API to query the leader. To avoid querying stale leaders, we will add the leader epoch the ListOffsets response. The consumer will use this in its first fetch request after resetting offsets. fencing. Additionally, we will modify this API to return the corresponding epoch for any offsets looked up.
  2. We will also provide the leader epoch in the offset commit and fetch APIs. This allows consumer groups to detect truncation across rebalances or restarts.
  3. For users that store offsets in an external system, we will provide APIs which expose the leader epoch of each record and we will provide an alternative seek API so that users can initialize the offset and leader epoch.

...

Code Block
class UnknownLeaderEpochException extends RetriableException {}


class FencedLeaderEpochException extends RetriableExceptionInvalidMetadataException {}

This will be located in the public `errors` package, but the consumer will internally retry when it receives this error.

...

Code Block
class ConsumerRecords<K, V> {
  /**
   * Get the next offsets that the consumer will consumer.
   * This can be passed directly to a commitSync(), for example,
   * after the full batch has been consumed. For finer-grained,
   * offset tracking, you should use the offset information from
   * the individual ConsumerRecord instances.
   */  
  Map<TopicPartition, OffsetAndMetadata> nextOffsets();
}

Protocol Changes

Fetch

We will bump the fetch request version in order to include the current leader epoch. For older versions, we will skip epoch validation as before.


Finally, we may as well protect the offsets found through the offsetsForTimes() API. An unclean leader election may invalidate the results of a lookup by time, so it would be unsafe to use seek() without considering the epoch information for the returned offsets. To support this, we will add the leader epoch to the OffsetAndTimestamp object which is returned from offsetsForTimes().

Code Block
class OffsetAndTimestamp {
  /**
   * Get the leader epoch corresponding to the offset that was 
   * found (if one exists). This can be provided to seek() to 
   * ensure that the log hasn't been truncated prior to fetching.
   */
  Optional<Integer> leaderEpoch();
}

Protocol Changes

Fetch

We will bump the fetch request version in order to include the current leader epoch. For older versions, we will skip epoch validation as before.

The new schema is given below:

...

Code Block
linenumberstrue
OffsetCommitRequestOffsetCommitRequest => GroupId Generation MemberId [TopicName [Partition Offset Timestamp LastLeaderEpoch Metadata]]
  GroupId => STRING
  Generation => INT32
  MemberId => STRING
  RetentionTime => INT64
  TopicName => GroupIdSTRING
  GenerationPartition MemberId [TopicName [Partition=> INT32
  Offset Timestamp=> INT64
  LastLeaderEpoch Metadata]]=> INT32  // New
  GroupIdMetadata => STRING
  Generation

OffsetFetch

The OffsetFetch response schema will be similarly modified. The request schema will remain the same.

Code Block
linenumberstrue
OffsetFetchResponse => INT32
[TopicName [Partition MemberIdOffset =>Timestamp STRING
LastLeaderEpoch  RetentionTime => INT64Metadata ErrorCode]]
  TopicName => STRING
  Partition => INT32
  Offset => INT64
  LastLeaderEpoch => INT32  // New
  Metadata => STRING

OffsetFetch


  ErrorCode => INT16

ListOffsets

We need two changes to the ListOffsets API. First, we add the current leader epoch to the request schema in order to protect clients from querying stale leadersThe OffsetFetch response schema will be similarly modified. The request schema will remain the same.

Code Block
linenumberstrue
OffsetFetchResponseListOffsetRequest => ReplicaId [TopicName [Partition Offset Timestamp LastLeaderEpoch Metadata ErrorCodeCurrentLeaderEpoch Time]]
 ReplicaId TopicName => STRINGINT32
  PartitionTopicName => INT32STRING
 Partition Offset => INT64INT32
  LastLeaderEpochCurrentLeaderEpoch => INT32  // New
  MetadataTime => STRING
  ErrorCode => INT16

...

INT64

Second, we add the leader epoch to the response that corresponds with the returned offset. The client can use this to reconcile the log prior to fetching from a new leaderFinally, we also modify the ListOffsets response schema.

Code Block
linenumberstrue
ListOffsetResponse => [TopicName [Partition Offset LeaderEpoch Timestamp ErrorCode]]
  TopicName => STRING
  Partition => INT32
  Offset => INT64
  LeaderEpoch => INT32  // New
  Timestamp => INT64
  ErrorCode => INT16

As with the Fetch and OffsetForLeaderEpoch APIs, the response will support the FENCED_LEADER_EPOCH and UNKNOWN_LEADER_EPOCH error codes.

Offset Schema Changes

This KIP introduces a new schema version for committed offsets in the internal __consumer_offsets topic. This is not a public API, but we mention it for compatibility implications. The version will be bumped to 3. The new schema is given below and corresponds with the changes to the OffsetCommit APIs:

Code Block
Offset Commit Value Schema (Version: 3) =>
  Offset => INT64
  LeaderEpochLastLeaderEpoch => INT32 // New
  Metadata => STRING
  CommitTimestamp => INT64

...