Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
For LeaderAndISRCommand, it calls on_LeaderAndISRCommand().
on_LeaderAndISRCommand(command):
1. Read the set of partitions set_P from command.
2. For each partition P in set_p
2.0 create the partition locally if not present
2.1 If the command asks this broker to be the new leader for P and this broker is not already the leader for P,
2.1.1 Stop the fetcher to the current leader
2.1.2 Become the leader and remembers the ZK version of the LeaderAndISR pathcall becomeLeader()
2.2 If the command asks this broker to following a leader L and the broker is not already following L
2.2.1 stop the fetcher to the current leader
2.2.2 become a follower to Lcall becomeFollower()
3. If the command has a flag INIT, delete all local partitions not in set_p.
has a flag INIT, delete all local partitions not in set_p.


becomeLeader(r: Replica, command)
{
   r.partition.leaderAndISRZKVersion = command.leaderAndISRZKVersion   r.partition.ISR = command.ISR   stop the HW checkpoint thread for r
   wait until every replica in ISR catches up to r.leo   r.hw = r.leo
   r.partition.leader = r                  // this enables reads/writes to this partition on this broker
   start a commit thread on r.partition
   start HW checkpoint thread for r
}

becomeFollower(r: Replica)
{
  // this is required if this replica was the last leader
  stop the commit thread, if any
  stop the current ReplicaFetcherThread, if any
  truncate the log to r.hw
  start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
  start HW checkpoint thread for r
}
For StopReplicaCommand, it calls on_StopReplicaCommand().
on_StopReplicaCommand(command):
1. Read the list of partitions from command.
2. For each such partition P
2.1 delete p from local storage, if present.

...

In the critical path, the most time consuming operation is step 3 where we need to write 1 ZK path per partition. Assuming that during a broker failover we need to change leader for 10K partitions and each ZK write takes 4ms, this could take 40 secs. One possibility is to use the multi() support in ZK 3.4 to batch those writes in 1 ZK operation.

2. ZKQueue vs direct RPC:

Communicating between the controller and the brokers via ZK is not efficient. Each communication requires 2 ZK writes (each costs roughly 2 RPC), 1 watcher firing and 1 ZK read. These add up to roughly 6 RPCs per communication. An alternative is to implement an admin RPC in the broker for direct communication between the controller and the brokers. Then each communication costs only 1 RPC. The admin RPC could specify a timeout, during which it expects the admin command to be completed. Using RPC means that when a broker is down, it could miss some commands from the controller. This proposal requires that the broker recover its state during startup by reading state information stored in ZK.

3. Dealing with multiple leaders in transition:

...