Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Once the controller is notified that a new replica has entered the ISR for a particular partition, it will remove it from the "addingReplicas" field in `/topics/[topic]`. If "addingReplicas" becomes empty, the controller will send a new wave of LeaderAndIsr requests to retire all the old replicas and, if applicable, elect new leaders.

Algorithm

We will always empty out add all the replicas in the "addingReplicas" field before starting to act on the "removingReplicas" field.

Code Block
# Illustrating the rebalance of a single partition.
# R is the current replicas, I is the ISR list, AR is addingReplicas and RR is removingReplicas
R: [1, 2, 3], I: [1, 2, 3], AR: [], RR: []
# Reassignment called via API with targetReplicas=[4,3,2]

R: [1, 4, 3, 2], I: [1, 2, 3], AR: [4], RR: [1] # Controller sends LeaderAndIsr requests 
# (We ignore RR until AR is empty)
R: [1, 4, 3, 2], I: [1, 2, 3, 4], AR: [4], RR: [1] # Leader 1 adds 4 to ISR
# ControllerThe controller picksrealizes upthat onall the change and removes 4 from AR.
# (in-memory) - R: [1, 4, 3, 2], I: [1, 2, 3, 4], AR: [], RR: [1] 
# Controller realizes AR is empty replicas in AR have been added and starts work on RR. Removes all of RR from R and from the ISR, and sends LeaderAndIsr requests
R: [4, 3, 2], I: [1, 2, 3, 4], AR: [], RR: [] # at this point the reassignment is considered complete
# Leaders change, 1 retires and 4 starts leading. 4 shrinks the ISR
R: [4, 3, 2], I: [2, 3, 4], AR: [], RR: [] considered complete


When all of the replicas in AR have been addedWe iteratively shrink the AR collection. When AR is empty, we empty out RR in one step. 

...

In general this algorithm is consistent with the current Kafka behavior - other brokers still get the full replica set consisting of both the original replicas and the new ones.

Note that because of iteratively shrinking the AR collection, the server will not know how to create a "revert" of a reassignment, as it is always seeing it in motion. To keep the server logic simpler, we defer to clients for potential reverts. Here is an example of a cancellation:

Code Block
# Illustrating the rebalance of a single partition.
# R is the current replicas, I is the ISR list, RR is removingReplicas and AR is addingReplicas
R: [1, 2, 3], I: [1, 2, 3], AR: [], RR: []
# Reassignment called via API with targetReplicas=[3, 4, 5]
R: [1, 2, 3, 4, 5], I: [1, 2, 3], AR: [4, 5], RR: [1, 2]
R: [1, 2, 3, 4, 5], I: [1, 2, 3, 4], AR: [4, 5], RR: [1, 2]
# Cancellation called
R: [1, 2, 3, 4], I: [1, 2, 3, 4], AR: [], RR: []

Essentially, once a cancellation is called we subtract AR from R, empty out both AR and RR, and send LeaderAndIsr requests to cancel the replica movements that have not yet completed.

...