Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In both cases, we need the Fetch request to include a field for the leader epoch. For the consumer, this epoch is the minimum expected epoch for the message at the fetch offset. The leader will use its leader epoch cache to validate that the epoch and fetch offset are correct. Specifically, it will ensure that there are no larger leader epochs which began at a smaller offset than the fetch offset. This would imply that the consumer did not see some messages from the later epoch, which means the log was truncated at some point. The leader will return a new LOG_TRUNCATION error code in this case. If the consumer has a larger epoch than the replica is aware of (the KIP-232 scenario), the leader will return UNKNOWN_LEADER_EPOCH.

The requirements for a following replica are slightly different. Upon becoming a follower, the replica has a truncation protocol which ensures that its log does not diverge from the leader. What is missing is proper fencing. A leader or follower change could be delayed indefinitely or even not received at all (as in the case of KAFKA-6880). The leader and follower must be able to ensure that they are on the same epoch for the replication protocol to work correctly.

So, for For the replica, the leader epoch in the fetch request will be the current leader epoch. If the leader receives a fetch request from a replica with a lower epoch, it will return a FENCED_REPLICA error. As in the case of the consumer, if the epoch on the follower is larger than the leader is aware of, it will return UNKNOWN_LEADER_EPOCH.

In fact, the need to fence replicas extends to the truncation step in the become follower transition, so we propose to add the current leader epoch to the OffsetsForLeaderEpoch request as well. The epoch will be validated in the same way that we validate the epoch in the fetch request.

...

In order to handle the LOG_TRUNCATION error code, the consumer will need to find the offset at which its log diverged. We propose to use the OffsetForLeaderEpoch API that was introduced in KIP-101. As the consumer is fetching from a partition, it will keep a small cache track of the recent epochs last epoch that were was fetched for each partition. In the case of a log truncation error, it will use this cache use this epoch and the last fetched offset to find the point of divergence.

...

If a user is not using an auto reset option, we will raise a LogTruncationException from poll() when log truncation is detected. This gives users the ability to reset state if needed or revert changes to downstream systems. The exception will include the partitions that were truncated and the offset of divergence as found above. This gives applications the ability to execute any logic to revert changes if needed and rewind the fetch offset. If the user ignores the exception, we will continue fetching from the current offset, but we will drop the last fetched offset metadata from the new FetchRequest so that we do not get the same log truncation errorUsers must handle this exception and reset the position of the consumer if no auto reset policy is enabled.

For consumers, we propose some additional extensions:

...

Initially, this proposal suggested that the follower should behave more like the consumer and include the expected next epoch in the fetch request. Unfortunately, model checking showed a weakness in this approach. To make the advancement of the high watermark safe, the leader must be be able to guarantee that all followers in the ISR are on the correct epoch. Any disagreement about the current leader epoch and the ISR can lead to the loss of committed data. See https://github.com/hachikuji/kafka-specification/blob/master/Kip320FirstTry.tla for more detail. We considered including both the current epoch and the expected epoch, but in neither case did we strictly require both epochs.