Status
Current state: Discussion
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP aims to solve two related problems in the handling of fetch requests.
Consumer handling of unclean truncation: When unclean leader election is enabled, we may lose committed data. A consumer which is reading from the end of the log will typically see an out of range error, which will cause it to use its auto.offset.reset
policy. To avoid losing data, users should use the "earliest" option, but that means consuming the log from the beginning.
It is also possible that prior to sending the next fetch, new data is written to the log so that the consumer's fetch offset becomes valid again. In this case, the consumer will just miss whatever data had been written between the truncation point and its fetch offset.
Neither behavior is ideal, but we tend to overlook it because the user has opted into weaker semantics by enabling unclean leader election. Unfortunately in some situations we have to enable unclean leader election in order to recover from serious faults on the brokers. Some users have also opted to keep unclean leader election enabled because they cannot sacrifice availability ever.
Inadequate Replica Fencing: We encountered a situation in KAFKA-6880 in which a deadlock caused a broker to enter a zombie state in which it was no longer registered in zookeeper. Nevertheless, its fetchers continued attempting to make progress by fetching from leaders. Since it was no longer receiving updates from the controller, the leader metadata became stale. However, the replica was still able to make progress and even rejoin the ISR as the assigned leader periodically became accurate again.
The problem in this situation is that the replica, which is no longer registered in Zookeeper, does not receive any notifications from the controller about leader changes. We depend on these notifications so that the follower can use our log truncation protocol to find the right starting offset for the new leader.
Proposed Changes
The problem in both of these cases is that the leader receiving the fetch request has no way to validate whether the data previously fetched was accurate given the state of its own log. We can look back to KIP-101 to understand the underlying problem. An offset alone does not uniquely identify a record. You also need the leader epoch in which it was written to the log. Similarly, a fetch offset does not uniquely identify the position in the log because it does not take the leader epoch into account. The broker can only assume that the fetcher had accurately consumed the log up to that point. As we saw in KAFKA-6880, this assumption is not always true. It also isn't true for consumers when unclean leader election is enabled.
To fix this problem, we propose to include in each fetch request the epoch returned in the last fetch in order to ensure that the fetched offset reflects the expected state of the log from the fetcher's perspective. This epoch is the minimum expected epoch for the message at the fetch offset. The leader will use its leader epoch cache to verify 1) whether the actual epoch is greater than or equal to the epoch in the fetch request, and 2) if the epoch is greater than the minimum expected epoch, that the new epoch does not begin at an earlier offset than the fetch offset. In the latter case, the leader can respond with a new LOG_TRUNCATION error code to indicate that truncation has occurred. The consumer and the replica would handle this slightly differently, but basically they would first try to find the point in the log of divergence. Below we discuss how the behavior changes for clients, leaders, and followers.
Leader Fetch Handling
When a broker receives a fetch request, it will validate the leader epoch (if provided). There are two cases that interest us:
Case 1: The epoch provided in the fetch request is larger than that of the leader itself. This is probably the result of stale metadata such as was observed in KIP-232. The broker will respond with a new UNKNOWN_LEADER_EPOCH error code which will cause the consumer to refresh metadata and try again.
Case 2: There exists a larger leader epoch with a starting offset which is smaller than the fetch offset. In this case, we know the log was truncated, so we will return a LOG_TRUNCATION error code.
Consumer Handling
In order to handle the LOG_TRUNCATION error code, the consumer will need to find the offset at which its log diverged. We propose to use the OffsetForLeaderEpoch API that was introduced in KIP-101. As the consumer is fetching from a partition, it will keep a small cache of the recent epochs that were fetched for each partition. In the case of a log truncation error, it will use this cache to find the point of divergence.
This KIP has implications for the consumer's offset reset policy, which defines what the consumer should do if its fetch offset becomes out of range. With this KIP, the only case in which this is possible is if the consumer fetches from an offset earlier than the log start offset. Otherwise, we will detect it either as a log truncation event or an instance of stale metadata (the KIP-232 scenario). By opting into an offset reset policy, the user allows for automatic adjustments to the fetch position, so we take advantage of this to to reset the offset precisely whenever possible. In some edge scenarios, it may not be possible to find this offset precisely and we still have to resort to the reset policy.
In fact, if we have to reset offsets, we can often do better than "earliest" or "latest" because we have the timestamp index. We propose to add a third option for `auto.offset.reset` to reset the offset to the "closest" offset. If the consumer fetches from an offset lower than the log start offset, we will use the new log start offset. If the offset is larger than the log end offset or if truncation is detected, we will set the reset offset precisely if possible using the logic in this KIP. However, if we fail to find this offset, we will reset using the timestamp of the most recently committed message. Finally, if the message format does not support timestamps, we will use the latest offset.
The following table summarizes the behavior for the three reset policies:
auto.offset.reset | Initialization | Offset out of range (too low) | Offset out of range (precisely known truncation offset) | Offset out of range (unknown truncation offset) |
---|---|---|---|---|
latest | log end offset | log end offset | truncation offset | log end offset |
earliest | log start offset | log start offset | truncation offset | log start offset |
closest | log start offset | log start offset | truncation offset | reset using timestamp if possible, otherwise use log end offset |
If a user is not using an auto reset option, we will raise a TruncatedPartitionException
from poll()
when log truncation is detected. This gives users the ability to reset state if needed or revert changes to downstream systems. The exception will include the partitions that were truncated and the offset of divergence as found above. This gives applications the ability to execute any logic to revert changes if needed and rewind the fetch offset. If the user ignores the exception, we will continue fetching from the current offset, but we will drop the last fetched offset metadata from the new FetchRequest so that we do not get the same log truncation error.
For consumers, we propose some additional extensions:
- To fix the problem with KIP-232, we will add the leader epoch the ListOffsets response. The consumer will use this in its first fetch request after resetting offsets. In the event that stale metadata redirects our Fetch request to an old leader and the broker returns UNKNOWN_LEADER_EPOCH, then the consumer will refresh metadata and retry.
- We will also provide the leader epoch in the offset commit and fetch APIs. This allows consumer groups to detect truncation across rebalances or restarts.
- For users that store offsets in an external system, we will provide APIs which expose the leader epoch of each record and we will provide an alternative
seek
API so that users can initialize the offset and leader epoch.
Replica Handling
When a replica receives this error, it will execute its usual truncation logic by attempting to find the offset of divergence using the OffsetForLeaderEpoch. Effectively it is treated as a leader change.
We will also change the leader behavior so that it is not permitted to add a replica to the ISR if it is marked as offline by the controller. By currently allowing this, we weaken acks=all semantics since the zombie contributes to the min.isr
requirement, but is not actually eligible to become leader.
Public Interfaces
Config Changes
As described above, the consumer's `auto.offset.reset` will now support a fourth option to reset offsets to the "closest" position.
API Changes
We will introduce a new exception type, which will be raised from KafkaConsumer.poll(Duration)
as described above. This exception extends from `OffsetOutOfRangeException` for compatibility. If a user's offset reset policy is set to "none," they can still catch this
class TruncatedPartitionException extends OffsetOutOfRangeException { /** * Get a map of the partitions which were truncated and the offset * at which the logs diverged from what we read locally. In other * words, offsets between this offset and the current fetch offset * were truncated from the broker's log. If the consumer was unable * to determine the point of divergence, then the value will be * `Optional.empty()`. */ Map<TopicPartition, Optional<Long>> truncatedOffsets(); }
The leader epoch will be exposed in the ConsumerRecord
and OffsetAndMetadata
objects.
class ConsumerRecord<K, V> { int leaderEpoch(); } class OffsetAndMetadata { long timestamp(); int leaderEpoch(); }
We will also have a new API to support seeking to an offset and leader epoch. For convenience and consistency, we will reuse the OffsetAndMetadata
object.
void seek(TopicPartition partition, OffsetAndMetadata offset);
Protocol Changes
We will bump the fetch request version in order to include the last fetched leader epoch. This will only be provided by the fetcher for consecutive fetches. If the consumer seeks to the middle of the log, for example, then we will use the sentinel value -1 and the leader will skip the epoch validation. The replica should always have the previous epoch except when beginning from the beginning of the log.
The new schema is given below:
FetchRequest => MaxWaitTime ReplicaId MinBytes IsolationLevel FetchSessionId FetchSessionEpoch [Topics] [RemovedTopics] MaxWaitTime => INT32 ReplicaId => INT32 MinBytes => INT32 IsolationLevel => INT8 FetchSessionId => INT32 FetchSessionEpoch => INT32 Topics => TopicName Partitions TopicName => STRING Partitions => [Partition FetchOffset StartOffset LeaderEpoch MaxBytes] Partition => INT32 FetchOffset => INT64 StartOffset => INT64 LeaderEpoch => INT32 // New MaxBytes => INT32 RemovedTopics => RemovedTopicName [RemovedPartition] RemovedTopicName => STRING RemovedPartition => INT32
The response schema will not change, but we will have two new error codes as mentioned above
- LOG_TRUNCATION: This error code indicates that the last fetched offset and epoch do not match the local state of the log.
- UNKNOWN_LEADER_EPOCH: This is a retriable error code which indicates that the epoch in the fetch request was larger than any known by the broker
The new OffsetCommit request schema is provided below. The response schema matches the previous version. We have added fields for the leader epoch and the timestamp.
OffsetCommitRequest => GroupId Generation MemberId [TopicName [Partition Offset Timestamp LeaderEpoch Metadata]] GroupId => STRING Generation => INT32 MemberId => STRING RetentionTime => INT64 TopicName => STRING Partition => INT32 Offset => INT64 Timestamp => INT64 // New LeaderEpoch => INT32 // New Metadata => STRING
The OffsetFetch response schema will be similarly modified. The request schema will remain the same.
OffsetFetchResponse => [TopicName [Partition Offset Timestamp LeaderEpoch Metadata ErrorCode]] TopicName => STRING Partition => INT32 Offset => INT64 Timestamp => INT64 // New LeaderEpoch => INT32 // New Metadata => STRING ErrorCode => INT16
Finally, we also modify the ListOffsets response schema.
ListOffsetResponse => [TopicName [Partition Offset LeaderEpoch Timestamp ErrorCode]] TopicName => STRING Partition => INT32 Offset => INT64 LeaderEpoch => INT32 // New Timestamp => INT64 ErrorCode => INT16
ACL Changes
Currently the OffsetForLeaderEpoch request is restricted to inter-broker communication. It requires authorization to the Cluster resource. As part of this KIP, we will change this to require Describe access on the Topic resource. For backwards compatibility, we will continue to allow the old authorization.
Compatibility, Deprecation, and Migration Plan
Older versions of the the modified APIs will continue to work as expected. When using older message format versions, which do not support leader epoch in the message format, we will use a sentinel value (-1) in the APIs that expose it.
Rejected Alternatives
None yet