Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Rework replica fetch logic to validate current epoch

...

Neither behavior is ideal, but we tend to overlook it because the user has opted into weaker semantics by enabling unclean leader election. Unfortunately in some situations we have to enable unclean leader election in order to recover from serious faults on the brokers. Some users have also opted to keep unclean leader election enabled because they cannot sacrifice availability ever. We would like to offer better client semantics for these situations.

Inadequate Replica Fencing: We encountered a situation in KAFKA-6880 in which a deadlock caused a broker to enter a zombie state in which it was no longer registered in zookeeper. Nevertheless, its fetchers continued attempting to make progress by fetching from leaders. Since it was no longer receiving updates from the controller, the leader metadata became stale. However, the replica was still able to make progress and even rejoin the ISR as the assigned leader periodically became accurate again.

...

The problem in both of these cases is that the leader receiving the fetch request has no way to validate whether the data previously fetched was accurate given the state of its own log. We can look back to KIP-101 to understand the underlying problem. An offset alone does not uniquely identify a record. You also need the leader epoch in which it was written to the log. Similarly, a fetch offset does not uniquely identify the position in the log because it does not take the leader epoch into account. The broker can only assume that the fetcher had accurately consumed the log up to that point. As we saw in KAFKA-6880, this assumption is not always true. It also isn't true for consumers when unclean leader election is enabled.

To fix this problem, we propose to include in each fetch request the epoch returned in the last fetch in order to ensure that the fetched offset reflects the expected state of the log from the fetcher's perspective. This epoch is the minimum expected epoch for the message at the fetch offset. The leader will use its leader epoch cache to validate that the epoch and fetch offset are correct. Specifically, it will ensure that there are no larger leader epochs which began at a smaller offset than the fetch offset. This would imply that the consumer did not see some messages from the later epoch, which means the log was truncated at some point. The leader will return a new LOG_TRUNCATION error code in this case. 

The consumer and the replica would handle log truncation in slightly different ways, but basically they will first try to find the point in the log of divergence. For the consumer, we will either reset the offset if an auto reset policy is configured, or we will raise an error to the user. For the replica, we will truncate to the point of divergence and continue replicating from there.

fetcher's expectation about the current state of the log is correct. For the consumer, we need to check whether the log has been truncated since the previous fetch request. For the follower, we need to verify that the follower and the leader have matching epochs. 

In both cases, we need the Fetch request to include a field for the leader epoch. For the consumer, this epoch is the minimum expected epoch for the message at the fetch offset. The leader will use its leader epoch cache to validate that the epoch and fetch offset are correct. Specifically, it will ensure that there are no larger leader epochs which began at a smaller offset than the fetch offset. This would imply that the consumer did not see some messages from the later epoch, which means the log was truncated at some point. The leader will return a new LOG_TRUNCATION error code in this case. If the consumer has a larger epoch than the replica is aware of (the KIP-232 scenario), the leader will return UNKNOWN_LEADER_EPOCH.

For the replica, the leader epoch will be the current leader epoch. If the leader receives a fetch request from a replica with a lower epoch, it will return a FENCED_REPLICA error. As in the case of the consumer, if the epoch on the follower is larger than the leader is aware of, it will return UNKNOWN_LEADER_EPOCH. In fact, the need to fence replicas extends to the truncation step in the become follower transition, so we propose to add the current leader epoch to the OffsetsForLeaderEpoch request as well. The epoch will be validated in the same way that we validate the epoch in the fetch request.

Below we elaborate on the changes to fetch handling in this KIP for replicas and consumersBelow we discuss how the behavior changes for clients, leaders, and followers.

Leader Fetch Handling

When a broker receives a fetch request, it will validate the leader epoch (if provided). There are two cases that interest us:Case 1: The epoch provided . For replicas, the check is simply whether the epoch in the fetch request is larger than that of the leader itself. This is probably the result of stale metadata such as was observed in KIP-232. The broker will respond with a new UNKNOWN_LEADER_EPOCH error code which will cause the consumer to refresh metadata and try again.matches the current leader epoch. If the follower's epoch is smaller than the leader's, we return the new FENCED_REPLICA error code.

For the consumer, log truncation is detected if there Case 2: There exists a larger leader epoch with a starting offset which is smaller than the fetch offset. In this case, we know the log was truncated, so we the leader will return a LOG_TRUNCATION error code in the fetch response.

Consumer Handling

In order to handle the LOG_TRUNCATION error code, the consumer will need to find the offset at which its log diverged. We propose to use the OffsetForLeaderEpoch API that was introduced in KIP-101. As the consumer is fetching from a partition, it will keep a small cache of the recent epochs that were fetched for each partition. In the case of a log truncation error, it will use this cache to find the point of divergence.

This KIP has implications for the consumer's offset reset policy, which defines what the consumer should do if its fetch offset becomes out of range. With this KIP, the only case in which this is possible is if the consumer fetches from an offset earlier than the log start offset. Otherwise, we will detect it either as a log truncation event or an instance of stale metadata (the KIP-232 scenario). 

Whether the fetch is from a replica or a consumer, if the epoch provided is larger than that of the leader itself, the broker will respond with a new UNKNOWN_LEADER_EPOCH error code which will cause the consumer to refresh metadata and try again. This is probably the result of stale metadata such as was observed in KIP-232

We will also change the leader behavior so that it is not permitted to add a replica to the ISR if it is marked as offline by the controller. By currently allowing this, we weaken acks=all semantics since the zombie contributes to the min.isr requirement, but is not actually eligible to become leader.

Consumer Handling

In order to handle the LOG_TRUNCATION error code, the consumer will need to find the offset at which its log diverged. We propose to use the OffsetForLeaderEpoch API that was introduced in KIP-101. As the consumer is fetching from a partition, it will keep a small cache of the recent epochs that were fetched for each partition. In the case of a log truncation error, it will use this cache to find the point of divergence.

On the other hand, if the fetch returns an UNKNOWN_LEADER_EPOCH error code, then the consumer will refresh metadata and retry the fetch.

This KIP has implications for the consumer's offset reset policy, which defines what the consumer should do if its fetch offset becomes out of range. With this KIP, the only case in which this is possible is if the consumer fetches from an offset earlier than the log start offset. Otherwise, we will detect it either as a log truncation event or an instance of stale metadata (the KIP-232 scenario). 

By opting into an offset reset policy, the user allows for automatic adjustments to the fetch position, so we take advantage of this to to reset the offset as precisely as possible when log truncation is detectedBy opting into an offset reset policy, the user allows for automatic adjustments to the fetch position, so we take advantage of this to to reset the offset as precisely as possible when log truncation is detected. In some pathological cases (e.g. multiple consecutive unclean leader elections), we may not be able to find the exact offset, but we should be able to get close by finding the starting offset of the next largest epoch that the leader is aware of. We propose in this KIP to change the behavior for both the "earliest" and "latest" reset modes to do this automatically as long as the message format supports lookup by leader epoch.  

If a user is not using an auto reset option, we will raise a TruncatedPartitionException LogTruncationException from poll() when log truncation is detected. This gives users the ability to reset state if needed or revert changes to downstream systems. The exception will include the partitions that were truncated and the offset of divergence as found above. This gives applications the ability to execute any logic to revert changes if needed and rewind the fetch offset. If the user ignores the exception, we will continue fetching from the current offset, but we will drop the last fetched offset metadata from the new FetchRequest so that we do not get the same log truncation error.

...

  1. To fix the problem with KIP-232, we will add the leader epoch the ListOffsets response. The consumer will use this in its first fetch request after resetting offsets. In the event that stale metadata redirects our Fetch request to an old leader and the broker returns UNKNOWN_LEADER_EPOCH, then the consumer will refresh metadata and retry. 
  2. We will also provide the leader epoch in the offset commit and fetch APIs. This allows consumer groups to detect truncation across rebalances or restarts.
  3. For users that store offsets in an external system, we will provide APIs which expose the leader epoch of each record and we will provide an alternative seek API so that users can initialize the offset and leader epoch.

Replica Handling

This KIP adds replica fencing for both fetching and truncation. When a replica receives this errorbecomes a follower, it will execute its usual truncation logic by attempting attempt to find the truncation offset of divergence using the OffsetForLeaderEpoch. Effectively it is treated as a leader change.

We will also change the leader behavior so that it is not permitted to add a replica to the ISR if it is marked as offline by the controller. By currently allowing this, we weaken acks=all semantics since the zombie contributes to the min.isr requirement, but is not actually eligible to become leader.

Public Interfaces

API Changes

We will introduce a new exception type, which will be raised from KafkaConsumer.poll(Duration) as described above. This exception extends from `OffsetOutOfRangeException` for compatibility. If a user's offset reset policy is set to "none," they will still be able to catch `OffsetOutOfRangeException`. For new usage, users can catch the more specific exception type and use the new `seekToNearest()` API defined below.

Code Block
class LogTruncationException extends OffsetOutOfRangeException {}

We will also add a new retriable error code for the UNKNOWN_LEADER_EPOCH error code:

OffsetsForLeaderEpoch API. The new epoch will be included in this request so that the follower is ensured that it is truncating using the log from the leader in the correct epoch. Without this validation, it is possible to truncate using the log of a leader with stale log state, which can lead to log divergence.

After the follower has truncated its log, it will begin fetching as it does today. It will similarly include the current leader epoch, which will be validated by the leader. If the fetch response contains either the FENCED_REPLICA or UNKNOWN_LEADER_EPOCH error code, the follower will simply retry since both errors may be transient (e.g. if the propagation of the LeaderAndIsr request is delayed).  

Public Interfaces

API Changes

We will introduce a new exception type, which will be raised from KafkaConsumer.poll(Duration) as described above. This exception extends from `OffsetOutOfRangeException` for compatibility. If a user's offset reset policy is set to "none," they will still be able to catch `OffsetOutOfRangeException`. For new usage, users can catch the more specific exception type and use the new `seekToNearest()` API defined below.


Code Block
class LogTruncationException extends OffsetOutOfRangeException {}

We will also add a new retriable error code for the UNKNOWN_LEADER_EPOCH error code:

Code Block
class UnknownLeaderEpochException extends 
Code Block
class UnknownLeaderEpochException extends RetriableException {}

This will be located in the public `errors` package, but the consumer will internally retry when it receives this error.

...

Code Block
class ConsumerRecord<K, V> {
	int leaderEpoch();
}

class OffsetAndMetadata {
  	int leaderEpoch();
}


We will also have a new API to support seeking to an offset and leader epoch. For convenience and consistency, we will reuse the OffsetAndMetadata object.

Code Block
/**
 * Seek to the nearest offset taking into account leader epoch information.
 * If the offset still exists and has the same leader epoch, then this will
 * set the position to that offset. Otherwise, if the log has been truncated
 * (e.g. due to unclean leader election), then the offset will be the first 
 * offset of the next largest leader epoch.
 */ 
void seekToNearest(TopicPartition partition, OffsetAndMetadata offset);


Protocol Changes

Fetch

We will bump the fetch request version in order to include the last fetched leader epoch. This will only be provided by the fetcher for consecutive fetches. If the consumer seeks to the middle of the log, for example, then we will use the sentinel value -1 and the leader will skip the epoch validation. The replica should always have the previous epoch except when beginning from the beginning of the log.

...

  1. LOG_TRUNCATION: This error code indicates that the last fetched offset and epoch do not match the local state of the log. This is only possible for consumers.
  2. FENCED_REPLICA: The replica has a lower epoch than the leader. This is retriable.
  3. UNKNOWN_LEADER_EPOCH: This is a retriable UNKNOWN_LEADER_EPOCH: This is a retriable error code which indicates that the epoch in the fetch request was larger than any known by the broker.

OffsetsForLeaderEpoch

The new OffsetCommit request schema is provided below. The response schema matches the previous version. We have added fields for the leader epoch and the timestamp.changes to the OffsetsForLeaderEpoch request API are similar.

Code Block
OffsetForLeaderEpochRequest
Code Block
linenumberstrue
OffsetCommitRequest => GroupId Generation MemberId [[Topic]
  Topic => TopicName [Partition] Offset
 Timestamp LeaderEpoch Metadata]]
  GroupIdTopicName => STRING
    GenerationPartition => PartitionId CurrentLeaderEpoch INT32LeaderEpoch
  MemberId => STRING
  RetentionTimePartitionId => INT64INT32
  TopicName => STRING
  PartitionCurrentLeaderEpoch => INT32
  Offset => INT64// New
  LeaderEpoch => INT32  // New
  Metadata LeaderEpoch => STRING

The OffsetFetch response schema will be similarly modified. The request schema will remain the same.

INT32

If the current leader epoch does not match that of the leader, then we will send either FENCED_REPLICA or UNKNOWN_LEADER_EPOCH as we do for the Fetch API.

Note that consumers will send a sentinel value (-1) for the current epoch and the broker will simply disregard that validation. 

OffsetCommit

The new OffsetCommit request schema is provided below. The response schema matches the previous version. We have added fields for the leader epoch and the timestamp.

Code Block
linenumberstrue
OffsetCommitRequest => GroupId Generation MemberId [TopicName [
Code Block
linenumberstrue
OffsetFetchResponse => [TopicName [Partition Offset Timestamp LeaderEpoch Metadata ErrorCode]]
  TopicNameGroupId => STRING
  PartitionGeneration => INT32
  OffsetMemberId => INT64STRING
  LeaderEpochRetentionTime => INT32  // NewINT64
  MetadataTopicName => STRING
  ErrorCodePartition => INT16

Finally, we also modify the ListOffsets response schema.

Code Block
linenumberstrue
ListOffsetResponse => [TopicName [Partition Offset LeaderEpoch Timestamp ErrorCode]]
  TopicName => STRING
  PartitionINT32
  Offset => INT64
  LeaderEpoch => INT32
  Offset => INT64// New
  LeaderEpochMetadata => INT32  // New
  Timestamp => INT64
  ErrorCode => INT16

Offset Schema Changes

This KIP introduces a new schema version for committed offsets in the internal __consumer_offsets topic. This is not a public API, but we mention it for compatibility implications. The version will be bumped to 3. The new schema is given below:

STRING

OffsetFetch

The OffsetFetch response schema will be similarly modified. The request schema will remain the same.

Code Block
linenumberstrue
OffsetFetchResponse => [TopicName [Partition Offset Timestamp LeaderEpoch Metadata ErrorCode]]
  TopicName => STRING
  Partition => INT32
  Offset => INT64
  LeaderEpoch => INT32  // New
  Metadata => STRING
  ErrorCode => INT16


ListOffsets

Finally, we also modify the ListOffsets response schema.

Code Block
linenumberstrue
ListOffsetResponse => [TopicName [Partition Offset LeaderEpoch Timestamp ErrorCode]]
  TopicName => STRING
  Partition => INT32
Code Block
Offset Commit Value Schema (Version: 3) =>
  Offset => INT64
  LeaderEpoch => INT32  // New
  MetadataTimestamp => STRINGINT64
  CommitTimestamp => INT64

...

  ErrorCode => INT16

Offset Schema Changes

This KIP introduces a new schema version for committed offsets in the internal __consumer_offsets topic. This is not a public API, but we mention it for compatibility implications. The version will be bumped to 3. The new schema is given below:

Code Block
Offset Commit Value Schema (Version: 3) =>
  Offset => INT64
  LeaderEpoch => INT32
  Metadata => STRING
  CommitTimestamp => INT64


As with previous changes to this schema, we will not begin using the new schema after an upgrade until the inter-broker version has been updated. This ensures that replicas on older versions do not fail if they need to parse the new schema version.

ACL Changes

Currently the OffsetForLeaderEpoch request is restricted to inter-broker communication. It requires authorization to the Cluster resource. As part of this KIP, we will change this to require Describe access on the Topic resource. For backwards compatibility, we will continue to allow the old authorization.

Compatibility, Deprecation, and Migration Plan

Older versions of the the modified APIs will continue to work as expected. When using older message format versions, which do not support leader epoch in the message format, we will use a sentinel value (-1) in the APIs that expose it. 

Rejected Alternatives

When the consumer cannot find the precise truncation point, another option is to use the timestamp of the last consumed message in order to find the right offset to reset to. One benefit of this is that it works for older message formats. The downsides are 1) it complicates the API and 2) the truncation point determined using timestamp is not as accurate as what can be determined using the leader epoch. Ultimately we decided it was not worthwhile complicating the APIs for the old message format.

Initially, this proposal suggested that the follower should behave more like the consumer and include the expected next epoch in the fetch request. Unfortunately, model checking showed a weakness in this approach. To make the advancement of the high watermark safe, the leader must be be able to guarantee that all followers in the ISR are on the correct epoch. Any disagreement about the current leader epoch and the ISR can lead to the loss of committed data. See https://github.com/hachikuji/kafka-specification/blob/master/Kip320FirstTry.tla for more detail

ACL Changes

Currently the OffsetForLeaderEpoch request is restricted to inter-broker communication. It requires authorization to the Cluster resource. As part of this KIP, we will change this to require Describe access on the Topic resource. For backwards compatibility, we will continue to allow the old authorization.

Compatibility, Deprecation, and Migration Plan

Older versions of the the modified APIs will continue to work as expected. When using older message format versions, which do not support leader epoch in the message format, we will use a sentinel value (-1) in the APIs that expose it. 

Rejected Alternatives

When the consumer cannot find the precise truncation point, another option is to use the timestamp of the last consumed message in order to find the right offset to reset to. One benefit of this is that it works for older message formats. The downsides are 1) it complicates the API and 2) the truncation point determined using timestamp is not as accurate as what can be determined using the leader epoch. Ultimately we decided it was not worthwhile complicating the APIs for the old message format.