Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
For LeaderAndISRCommand, it calls on_LeaderAndISRCommand().
on_LeaderAndISRCommand(command):
1. Read the set of partitions set_P from command.
2. For each partition P in set_p
2.0 create the partition locally if not present
2.1 If the command asks this broker to be the new leader for P and this broker is not already the leader for P,
2.1.1 Stop the fetcher to the current leader
2.1.2 call becomeLeader()
2.2 If the command asks this broker to following a leader L and the broker is not already following L
2.2.1 stop the fetcher to the current leader
2.2.2 call becomeFollower()
3. If the command has a flag INIT, delete all local partitions not in set_p.


becomeLeader(r: Replica, command)
{
   r.partition.leaderAndISRZKVersion = command.leaderAndISRZKVersion   r.partition.ISR = command.ISR   stop the HW checkpoint thread for r
   wait until every replica in ISR catches up to r.leo   r.hw = r.leo
   r.partition.leader = r                  // this enables reads/writes to this partition on this broker
   start a commit thread on r.partition
   start HW checkpoint thread for r
}

becomeFollower(r: Replica)
{
  // this is required if this replica was the last leader
  stop the commit thread, if any
  stop the current ReplicaFetcherThread, if any
  truncate the log to r.hw
  start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
  start HW checkpoint thread for r
}



For StopReplicaCommand, it calls on_StopReplicaCommand().
on_StopReplicaCommand(command):
1. Read the list of partitions from command.
2. For each such partition P
2.1 delete p from local storage, if present.

...