...
Scenario 1: new empty follower
Broker A broker is added to the cluster and assigned as a replica for a partition. This broker will not have any local data as it it has just become a follower for the first time. It will try to fetch the offset 0 from the leader. If that offset does not exist on the leader, the follower will receive the OFFSET_MOVED_TO_TIERED_STORAGE error. The follower will then send a ListOffset request with timestamp = EARLIEST_LOCAL_TIMESTAMP, and will receive the offset of the leader's earliest local message.
...
Fetch remote segment info, and rebuild leader epoch sequence.
Broker A (Leader) | Broker B (Follower) | Remote Storage | RL metadata storage |
3: msg 3 LE-1 4: msg 4 LE-1 5: msg 5 LE-2 6: msg 6 LE-2 7: msg 7 LE-3 (HW) leader_epochs LE-0, 0 LE-1, 3 LE-2, 5 LE-3, 7 | 1. Fetch LE-1, 0 2. Receives OMRS 3. Receives ELO 3, LE-1 4. Fetch remote segment info and build local leader epoch sequence until ELO leader_epochs LE-0, 0 LE-1, 3 LE-2, 5 | seg-0-2, uuid-1 log: 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 epochs: LE-0, 0 seg 3-5, uuid-2 log: 3: msg 3 LE-1 4: msg 4 LE-1 5: msg 5 LE-2 epochs: LE-0, 0 LE-1, 3 LE-2, 5 | seg-0-2, uuid-1 segment epochs LE-0, 0 seg-3-5, uuid-2 segment epochs LE-1, 3 LE-2, 5 |
step 2:
continue fetching from the leader
Broker A (Leader) | Broker B (Follower) | Remote Storage | RL metadata storage |
3: msg 3 LE-1 4: msg 4 LE-1 5: msg 5 LE-2 6: msg 6 LE-2 7: msg 7 LE-3 (HW) leader_epochs LE-0, 0 LE-1, 3 LE-2, 5 LE-3, 7 | Fetch from ELO to HW 3: msg 3 LE-1 4: msg 4 LE-1 5: msg 5 LE-2 6: msg 6 LE-2 7: msg 7 LE-3 (HW) leader_epochs LE-0, 0 LE-1, 3 LE-2, 5 LE-3, 7 | seg-0-2, uuid-1 log: 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 epochs: LE-0, 0 seg 3-5, uuid-2 log: 3: msg 3 LE-1 4: msg 4 LE-1 5: msg 5 LE-2 epochs: LE-0, 0 LE-1, 3 LE-2, 5 | seg-0-2, uuid-1 segment epochs LE-0, 0 seg-3-5, uuid-2 segment epochs LE-1, 3 LE-2, 5 |
Scenario 2: out-of-sync follower catching up
...
An out-of-sync follower (broker B) has local data up to offset 3
Broker A (Leader) | Broker B (Follower) | Remote Storage | RL metadata storage |
0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 3: msg 3 LE-1 4: msg 4 LE-1 5: msg 5 LE-2 6: msg 6 LE-2 7: msg 7 LE-3 8: msg 8 LE-3 9: msg 9 LE-3 (HW) leader_epochs LE-0, 0 LE-1, 3 LE-2, 5 LE-3, 7 | 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 3: msg 3 LE-1 leader_epochs LE-0, 0 LE-1, 3 1. Because the latest leader epoch in the local storage (LE-1) does not equal to the current leader epoch (LE-3). The follower starts from the Truncating state. 2. fetchLeaderEpochEndOffsets(LE-1) returns 5, which is larger than the latest local offset. With the existing truncation logic, the local log is not truncated and it moves to Fetching state. | seg-0-2, uuid-1 log: 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 epochs: LE-0, 0 seg 3-5, uuid-2 log: 3: msg 3 LE-1 4: msg 4 LE-1 5: msg 5 LE-2 epochs: LE-0, 0 LE-1, 3 LE-2, 5 | seg-0-2, uuid-1 segment epochs LE-0, 0 seg-3-5, uuid-2 segment epochs LE-1, 3 LE-2, 5 |
step 2:
Local segments on the leader are deleted because of retention, and then the follower starts trying to catch up with the leader.
Broker A (Leader) | Broker B (Follower) | Remote Storage | RL metadata storage |
9: msg 9 LE-3 10: msg 10 LE-3 11: msg 11 LE-3 (HW) [segments till offset 8 were deleted] leader_epochs LE-0, 0 LE-1, 3 LE-2, 5 LE-3, 7 | 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 3: msg 3 LE-1 leader_epochs LE-0, 0 LE-1, 3 <Fetch State> 1. Fetch LE-1, 4 2. Receives OMRS, truncate local segments. 3. Fetch ELO, Receives ELO 9, LE-3 and moves to BuildingRemoteLogAux state | seg-0-2, uuid-1 log: 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 epochs: LE-0, 0 seg 3-5, uuid-2 log: 3: msg 3 LE-1 4: msg 4 LE-1 5: msg 5 LE-2 epochs: LE-0, 0 LE-1, 3 LE-2, 5 Seg 6-8, uuid-3, LE-3 log: 6: msg 6 LE-2 7: msg 7 LE-3 8: msg 8 LE-3 epochs: LE-0, 0 LE-1, 3 LE-2, 5 LE-3, 7 | seg-0-2, uuid-1 segment epochs LE-0, 0 seg-3-5, uuid-2 segment epochs LE-1, 3 LE-2, 5 seg-6-8, uuid-3 segment epochs LE-2, 5 LE-3, 7 |
step 3:
After deleting the local data, this case becomes the same as scenario 1.
Broker A (Leader) | Broker B (Follower) | Remote Storage | RL metadata storage |
9: msg 9 LE-3 10: msg 10 LE-3 11: msg 11 LE-3 (HW) [segments till offset 8 were deleted] leader_epochs LE-0, 0 LE-1, 3 LE-2, 5 LE-3, 7 | 1. follower rebuilds leader epoch sequence up to LE-3 using remote segment metadata and remote data leader_epochs LE-0, 0 LE-1, 3 LE-2, 5 LE-3, 7 2. follower continue fetching from the leader from ELO (9, LE-3) 9: msg 9 LE-3 10: msg 10 LE-3 11: msg 11 LE-3 (HW) | seg-0-2, uuid-1 log: 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 epochs: LE-0, 0 seg 3-5, uuid-2 log: 3: msg 3 LE-1 4: msg 4 LE-1 5: msg 5 LE-2 epochs: LE-0, 0 LE-1, 3 LE-2, 5 Seg 6-8, uuid-3, LE-3 log: 6: msg 6 LE-2 7: msg 7 LE-3 8: msg 8 LE-3 epochs: LE-0, 0 LE-1, 3 LE-2, 5 LE-3, 7 | seg-0-2, uuid-1 segment epochs LE-0, 0 seg-3-5, uuid-2 segment epochs LE-1, 3 LE-2, 5 seg-6-8, uuid-3 segment epochs LE-2, 5 LE-3, 7 |
Scenario 3: Multiple hard failures (Scenario 2 of KIP-101)
Step 1:
Broker A (Leader) | Broker B | Remote Storage |
Remote Segment
Metadata
RL metadata storage | |||
0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 (HW) leader_epochs LE-0, 0 | 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 (HW) leader_epochs LE-0, 0 | seg-0-1: log: 0: msg 0 LE-0 1: msg 1 LE-0 epoch: LE-0, 0 | seg-0-1, uuid-1 segment epochs LE-0, 0 |
Broker A has shipped its 1st log segment to remote storage.
...
In this case, it is acceptable to lose data, but we have to keep the same behaviour as described in KIP-101.
Broker A (stopped) | Broker B (Leader) | Remote Storage |
Metadata
RL metadata storage | |||
0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 (HW) leader_epochs LE-0, 0 | 0: msg 0 LE-0 (HW) 1: msg 3 LE-1 leader_epochs LE-0, 0 LE-1, 1 | seg-0-1: log: 0: msg 0 LE-0 1: msg 1 LE-0 epoch: LE-0, 0 | seg-0-1, uuid-1 segment epochs LE-0, 0 |
After restart, B losses message 1 and 2. B becomes the new leader, and receives a new message 3 (LE1, offset 1).
...
After restart, broker A truncates offset 1 and 2 (LE-0), and receives the new message (LE-1, offset 1).
Broker A (follower) | Broker B (Leader) | Remote Storage |
Metadata
RL metadata storage | |||
0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 1: msg 3 LE-1 (HW) leader_epochs LE-0, 0 LE-1, 1 | 0: msg 0 LE-0 1: msg 3 LE-1 (HW) leader_epochs LE-0, 0 LE-1, 1 | seg-0-1: log: 0: msg 0 LE-0 1: msg 1 LE-0 epoch: LE-0, 0 | seg-0-1, uuid-1 segment epochs LE-0, 0 |
Step 4:
Broker A (follower) | Broker B (Leader) | Remote Storage |
Remote Segment
Metadata
RL metadata storage | |||
0: msg 0 LE-0 1: msg 3 LE-1 2: msg 4 LE-1 (HW) leader_epochs LE-0, 0 LE-1, 1 LE-2, 2 | 0: msg 0 LE-0 1: msg 3 LE-1 2: msg 4 LE-1 (HW) leader_epochs LE-0, 0 LE-1, 1 LE-2, 2 | seg-0-1: log: 0: msg 0 LE-0 1: msg 1 LE-0 epoch: LE-0, 0 seg-1-1 log: 1: msg 1 LE-1 epoch: LE-0, 0 LE-1, 1 | seg-0-1, uuid-1 segment epochs LE-0, 0 seg-1-1, uuid-2 segment epochs LE-1, 1 |
A new message (message 4) is received. The 2nd segment on broker B (seg-1-1) is shipped to remote storage.
...
Scenario 4: unclean leader election including truncation.
Step 1:
Broker A (Leader) | Broker B (out-of-sync) | Remote Storage |
Metadata
RL metadata storage | |||
0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 3: msg 3 LE-0 (HW) leader_epochs LE-0, 0 | 0: msg 0 LE-0 (HW) leader_epochs LE-0, 0 | seg 0-2: log: 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 epoch: LE-0, 0 | seg-0-2, uuid-1 segment epochs LE-0, 0 |
Step 2:
Broker A (Stopped) | Broker B (Leader) | Remote Storage |
Remote Segment
Metadata
RL metadata storage | |||
0: msg 0 LE-0 1: msg 4 LE-1 2: msg 5 LE-1 (HW) leader_epochs LE-0, 0 LE-1, 1 | seg 0-2: log: 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 epoch: LE-0, 0 seg 0-1: 0: msg 0 LE-0 1: msg 4 LE-1 epoch: LE-0, 0 LE-1, 1 | seg-0-2, uuid-1 segment epochs LE-0, 0 seg-0-1, uuid-2 segment epochs LE-0, 0 LE-1, 1 |
Broker A stopped, an out-of-sync replica (broker B) became the new leader. With unclean-leader-election, it's acceptable to lose data, but we have to make sure the existing Kafka behaviour is not changed.
...
Broker B ships its local segment (seg-0-1) to remote storage, after the highwater mark is moved to 2 (message 5).
Step 3:
Broker A (Stopped) | Broker B (Leader) | Remote Storage |
Remote Segment
Metadata
RL metadata storage | |||
2: msg 5 LE-1 (HW) leader_epochs LE-0, 0 LE-1, 1 | seg 0-2: log: 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 epoch: LE-0, 0 seg 0-1: 0: msg 0 LE-0 1: msg 4 LE-1 epoch: LE-0, 0 LE-1, 1
| seg-0-2, uuid-1 segment epochs LE-0, 0 seg-0-1, uuid-2 segment epochs LE-0, 0 LE-1, 1 |
The 1st local segment on broker B expired.
...
Scenario 5: log divergence in remote storage - unclean leader election
step 1
Broker A (Leader) | Broker B | Remote Storage | Remote Segment |
Metadata | |||
0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 3: msg 3 LE-0 4: msg 4 LE-0 (HW) leader_epochs LE-0, 0 broker A shipped one segment to remote storage | 0: msg 0 LE-0 1: msg 1 LE-0 leader_epochs LE-0, 0 broker B is out-of-sync | seg-0-3 log: 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 3: msg 3 LE-0 epoch: LE0, 0 | seg-0-3, uuid1 segment epochs LE-0, 0 |
step 2
An out-of-sync broker B becomes the new leader, after broker A is down. (unclean leader election)
Broker A (stopped) | Broker B (Leader) | Remote Storage |
Metadata
RL metadata storage | |||
0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 3: msg 3 LE-0 4: msg 4 LE-0 leader_epochs LE-0, 0 | 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 4 LE-1 3: msg 5 LE-1 4: msg 6 LE-1 leader_epochs LE-0, 0 LE-1, 2 After becoming the new leader, B received several new messages, and shipped one segment to remote storage. | seg-0-3 log: 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 3: msg 3 LE-0 epoch: LE-0, 0 Seg-0-3 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 4 LE-1 3: msg 5 LE-1 epoch: LE-0, 0 LE-1, 2 | seg-0-3, uuid1 segment epochs LE-0, 0 seg-0-3, uuid2 segment epochs LE-0, 0 LE-1, 2 |
step 3
Broker B is down. Broker A restarted without knowing LE-1. (another unclean leader election)
Broker A (Leader) | Broker B (stopped) | Remote Storage |
Remote Segment
Metadata
RL metadata storage | |||
0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 3: msg 3 LE-0 4: msg 4 LE-0 5: msg 7 LE-2 6: msg 8 LE-2 leader_epochs LE-0, 0 LE-2, 5 1. Broker A receives two new messages in LE-2 2. Broker A shipps seg-4-5 to remote storage | 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 4 LE-1 3: msg 5 LE-1 4: msg 6 LE-1 leader_epochs LE-0, 0 LE-1, 2 | seg-0-3 log: 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 3: msg 3 LE-0 epoch: LE-0, 0 seg-0-3 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 4 LE-1 3: msg 5 LE-1 epoch: LE-0, 0 LE-1, 2 seg-4-5 epoch: LE-0, 0 LE-2, 5 | seg-0-3, uuid1 segment epochs LE-0, 0 seg-0-3, uuid2 segment epochs LE-0, 0 LE-1, 2 seg-4-5, uuid3 segment epochs LE-0, 0 LE-2, 5 |
step 4
Broker B reimaged and lost all the local data
Broker A (Leader) | Broker B (stopped) | Remote Storage |
Metadata
RL metadata storage | |||
6: msg 8 LE-2 leader_epochs LE-0, 0 LE-2, 5 | 1. Broker B fetches offset 0, and receives OMRS error. 2. Broker B receives ELO=6, LE-2 3. in BuildingRemoteLogAux state, broker B finds seg-4-5 has LE-2. So, it builds local LE cache from seg-4-5: leader_epochs LE-0, 0 LE-2, 5 4. Broker B continue fetching from local messages from 6, LE-2 5. Broker B joins ISR | seg-0-3 log: 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 2 LE-0 3: msg 3 LE-0 epoch: LE-0, 0 seg-0-3 0: msg 0 LE-0 1: msg 1 LE-0 2: msg 4 LE-1 3: msg 5 LE-1 epoch: LE-0, 0 LE-1, 2 seg-4-5 epoch: LE-0, 0 LE-2, 5 | seg-0-3, uuid1 segment epochs LE-0, 0 seg-0-3, uuid2 segment epochs LE-0, 0 LE-1, 2 seg-4-5, uuid3 segment epochs LE-0, 0 LE-2, 5 |
A consumer fetches offset 3, LE-1 from broker B will be fenced.
...