Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
titleStep 1
Broker A:
0: offsets [0, 10], leader epoch: 1
1: offsets [11, 20], leader epoch: 1

Broker B:
0: offsets [0, 10], leader epoch: 1

Broker B is then is elected with epoch 2 due to preferred leader election. So both brokers A and B are still in ISR. Broker B appends a new batch with offsets (11, n) to its local log. So we We now have this:

Code Block
titleStep 2
Broker A:
0: offsets [0, 10], leader epoch: 1
1: offsets [11, 20], leader epoch: 1

Broker B:
0: offsets [0, 10], leader epoch: 1
1: offsets: [11, n], leader epoch: 2

Normally we expect broker A to truncate to offset 11 on becoming the follower, but before it is able to do so, broker B has zk has Zookeeper session expiration and broker A again becomes leader, now with epoch 3. It then appends a new entry in the range (21, 30). The updated logs look like this:

...

In the case of clean leader election, we still expect only one roundtrip of OffsetForLeaderEpoch request/response. In the corner case described above, the leader will responds with the largest epoch less than the requested epoch, which the follower will know about and do a truncation. For multiple roundtrips of OffsetForLeaderEpoch to happen, there must be at least three fast leader changes without changes in ISR in the first two, which should not happen in the clean leader election case (since we cannot have two prefered leader elections for the same topic partition back to back). 

In the case of unclean leader election, the solution in essense compares the complete epoch lineage between the brokers by exchanging epoch information until they find the largest epoch both brokers know about, and then truncating to the latest offset of that epoch (or not truncating if the follower's offset for that epoch is smaller than leader's offset). So, in some cases, multiple rounds of OffsetForLeaderEpoch request/response may be required to get there. 

...

  1. In the current protocol, when the follower sends OffsetForLeaderEpoch request for the partition to the leader, the request includes the latest Leader Epoch in the follower's Leader Epoch Sequence. This step and steps before that are the same. 
  2. The leader responds with the largest epoch less than or equal to the requested epoch (LeaderEpoch) and the end offset of this epoch (LastOffset).
  3. If the follower has the epoch received in the response has LeaderEpoch received from the follower leader in its Leader Epoch Sequence file, then we go to step 4. Otherwise, the follower sends OffsetForLeaderEpoch request with the largest epoch less than the epoch received from the leaderthan LeaderEpoch, and steos steps 2 and 3 repeat until the follower receives the epoch it has in its knows about (the epoch is in the follower's Leader Epoch Sequence file).
  4. The follower truncate truncates all offsets with epochs larger than the epoch received from the leader (LeaderEpoch), and then truncates its log to the leader's LastOffset.
  5. The follower starts fetching from the leader and the remainder of the protocol remains unchanged.

...

We will walk through several scenarios and show how this protocol change the proposed solution solves the problem.

Scenario 1: Leader change due to preferred leader election followed by immediate leader change.

 Consider the very first scenario described in this document, where broker A becomes a leader with epoch 3. It then appends a new entry in the range (21, 30). To remind, the updated logs look like this at this point:

Code Block
titleStep 3
Broker A:
0: offsets [0, 10], leader epoch: 1
1: offsets [11, 20], leader epoch: 1
2: offsets: [21, 30], leader epoch: 3

Broker B:
0: offsets [0, 10], leader epoch: 1
1: offsets: [11, n], leader epoch: 2

 On becoming follower, broker B sends OffsetForLeaderEpoch to broker A with leader_epoch 2. Broker A finds largest offset epoch <= 2 and log end offset in this epoch, and sends response {leader_epoch=1, offset = 21}. Broker B truncates all offsets for epochs > 1, in our example offsets [11, n], its LEO becomes 11. Since 21 > 11, broker B starts fetching from offset 11.

...

Here we show that scenarios fixed with KIP-101 will have the same behavior with this approach.  Suppose we have brokers A and B. B is the leader. The following is there their current state which also includes where their High Watermark is. 

...

Info
titleUnclean Leader Election Scenario
1. [LeaderEpoch0] Write a message to A (offset A:0), Stop broker A. Bring up broker B which becomes leader 
2. [LeaderEpoch1] Write a message to B (offset B:0), Stop broker B. Bring up broker A which becomes leader
3. [LeaderEpoch2] Write a message to A (offset A:1), Stop broker A. Bring up broker B which becomes leader
4. [LeaderEpoch3] Write a message to B (offset B:1), 
5. Bring up broker A. 

 At step 5, broker A sends OffsetForLeaderEpoch to broker B with leader_epoch 2. Broker B responds with (leader_epoch 1, offset 1). Broker A sends another OffsetForLeaderEpoch to broker B with leader_epoch 0. Broker B responds with UNKNOWN_OFFSET_FOR_LEADER_EPOCH since it exhausted all epochs (in a more common case, there will be some epoch they both know about). Broker A will truncate to its HW which is offet 0 and starts fetching from offset 0.

...

The proposed solution requires that we preserve history in LeaderEpochSequence file. Note that this is also required in the current implementation if we want to guarantee no log divergence. The only reason for "losing" entries in LeaderEpoch file is if we actually lose LeaderEpoch file and have to rebuild it from the log. If we delete all offsets for a particular epoch for some topic partition, we may miss some entries in the LeaderEpoch LeaderEpochSequence file. 

We will have to modify topic compaction logic, which we will not do as part of this KIP. Possible solutionsnot do any changes to compaction logic in this KIP, but here is possible fixes to compaction logic:

  1. Leave a tombstone in the log if we delete all offsets for some epoch, so that LeaderEpoch file can be rebuilt
  2. Do not compact further than persistent HW.

...

  1. Multiple rounds of OffsetForLeaderEpoch between follower and leader until they find the largest epoch both of them know about. This approach is most similar to the proposed solution, except we don't add leader_epoch field in the response, and instead, the leader responds with UNKNOWN_OFFSET_FOR_LEADER_EPOCH error with an error (we would need to introduce a new "unknown_epoch" kind of error)  if the requested epoch is not known to the leader. The follower then sends another OffsetForLeaderEpoch request with the next epoch (going backwards), and so on, until the leader receives the OffsetForLeaderEpoch with the epoch it knows about and replies with the end offset for that epoch. Lets call that epoch leader_epoch_K. As with proposed approach, the follower will truncate all offsets with epochs larger than leader_epoch_k, which changes follower's LEO. Then, the follower truncates the log if new LOE is larger than the offset received from the leader. 
  2. The follower sends all sequences of {leader_epoch, end_offset} between HW and LEO, and the leader responds with the offsets where the logs start diverging. Modify OffsetForLeaderEpoch request to contain a sequence of {leader_epoch, end_offset} for each topic/partition. The sequence includes epochs between the follower's High Watermark (HW) and log end offset. The leader will look at sequence starting with the latest epoch, and find the first matching leader_epoch, lets call it leader_epoch_match. The leader will respond with offset = min(end_offset for leader_epoch_match on the leader, end_offset for leader_epoch_match on the follower). 

Both alternatives also require bump in the protocol version. Approach 1 does not need changes in OffsetForLeaderEpoch request/response, but requires a new error to distinguish between the case where the follower needs to fall back to the high watermak approach. We chose our proposed solution over approach 1, because it requires less roundtrips of OffsetForLeaderEpoch and removes any amboguity ambiguity in the OffsetForLeaderEpoch response by indicating which leader epoch the end offset corresponds to. Approach 2 minimizes the number of roundtrips (one for the clean leader election), but may still need multiple roundtrips in case of unclean leader election (in case we need to look beyond the high watermark) and also increases the size of OffsetForLeaderEpoch request by at least 64bit, and in case of unclean leader election by much more. 

...