THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
For LeaderAndISRCommand, it calls on_LeaderAndISRCommand().
on_LeaderAndISRCommand(command):
1. Read the set of partitions set_P from command.
2. For each partition P in set_p
2.0 create the partition locally if not present
2.1 If the command asks this broker to be the new leader for P and this broker is not already the leader for P,
2.1.1 Stop the fetcher to the current leader
2.1.2 call becomeLeader()
2.2 If the command asks this broker to following a leader L and the broker is not already following L
2.2.1 stop the fetcher to the current leader
2.2.2 call becomeFollower()
3. If the command has a flag INIT, delete all local partitions not in set_p.
becomeLeader(r: Replica, command)
{
r.partition.leaderAndISRZKVersion = command.leaderAndISRZKVersion r.partition.ISR = command.ISR stop the HW checkpoint thread for r
wait until every replica in ISR catches up to r.leo r.hw = r.leo
r.partition.leader = r // this enables reads/writes to this partition on this broker
start a commit thread on r.partition
start HW checkpoint thread for r
}
becomeFollower(r: Replica)
{
// this is required if this replica was the last leader
stop the commit thread, if any
stop the current ReplicaFetcherThread, if any
truncate the log to r.hw
start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
start HW checkpoint thread for r
}
For StopReplicaCommand, it calls on_StopReplicaCommand().
on_StopReplicaCommand(command):
1. Read the list of partitions from command.
2. For each such partition P
2.1 delete p from local storage, if present.
|
...