Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
on_broker_change():
1. Get the current live brokers from BrokerPath in ZK
2. Determine set_p, a set of partitions who leader is no longer live.
3. For each partition P in set_p
3.1 Read the current ISR of P from LeaderAndISR path in ZK
3.2 Determine the new leader and the new ISR of P:
    If ISR has at least 1 broker in the live broker list, select one of those brokers as the new leader. The new ISR includes all brokers in the current ISR that are alive.
    Otherwise, select one of the live brokers in AR as the new leader and set that broker as the new ISR (potential data loss in this case).
    Finally, if none of the brokers in AR is alive, set the new leader to -1.
3.3 Write the new leader, ISR and a new epoc (increase current epoc by 1) in /brokers/topics/[topic]/[partition_id]/leaderAndISR.
    This write has to be done conditionally. If the version of the LeaderAndISR path has changed btw 1.1 and 1.3, go back to 1.1.
4. Send a LeaderAndISRCommand (containers the new leader/ISR and the ZK version of the LeaderAndISR path) for each partition in set_p to the affected brokers.
   For efficiency, we can put multiple commands in one RPC request.
(Ideally we want to use ZK multi to do the reads and writes in step 3.1 and 3.3.)
B.

...

Creating/deleting topics.

The controller watches child change of /brokers/topics. When the watcher gets triggered, it calls on_topic_change().

Code Block

on_topic_change():
The controller keeps in memory a list of existing topics.
1. If a new topic is created, read the TopicPath in ZK to get topic's replica assignment.
1.1. call init_leaders() on all newly created partitions.
2. If a topic is deleted, send the StopReplicaCommand to all affected brokers.

init_leaders(set_p):
Input: set_p, a set of partitions
0. Read the current live broker list from the BrokerPath in ZK
1. For each partition P in set_p
1.1 Select one of the live brokers in AR as the new leader and set all live brokers in AR as the new ISR.
1.2 Write the new leader and ISR in /brokers/topics/[topic]/[partition_id|partition_id]/leaderAndISR
2. Send the LeaderAndISRCommand to the affected brokers. Again, for efficiency, the controller can send multiple commands in 1 RPC.
C. Broker acts on commands from the controller.

Each broker listens to commands from the controller through RPC. 

Code Block

For LeaderAndISRCommand

Each broker listens to commands from the controller through RPC. 

Code Block

For LeaderAndISRCommand, it calls on_LeaderAndISRCommand().
on_LeaderAndISRCommand(command):
1. Read the set of partitions set_P from command.
2. For each partition P in set_p
2.1 If the command asks this broker to be the new leader for P and this broker is not already the leader for P,
2.1.1 Stop the fetcher to the current leader
2.1.2 Become the leader and remembers the ZK version of the LeaderAndISR path
2.2 If the command asks this broker to following a leader L and the broker is not already following L
2.2.1 stop the fetcher to the current leader
2.2.2 become a follower to L

3. If the command has a flag INIT, delete all local partitions not in set_p.


For StopReplicaCommand, it calls on_StopReplicaCommandLeaderAndISRCommand().
on_StopReplicaCommandLeaderAndISRCommand(command):
1. Read the listset of partitions set_P from command.
2. For each such partition P
2.1 delete p from local storage, if present.
C. Creating/deleting topics.

The controller watches child change of /brokers/topics. When the watcher gets triggered, it calls on_topic_change().

Code Block

on_topic_change():
The controller keeps in memory a list of existing topics.
1. If a new topic is created, read the TopicPath in ZK to get topic's replica assignment.
1.1. call init_leaders() on all newly created partitions.
2. If a topic is deleted, send the StopReplicaCommand to all affected brokers.

init_leaders(set_p):
Input: set_p, a set of partitions
0. Read the current live broker list from the BrokerPath in ZK
1. For each partition P in set_p
1.1 Select one of the live brokers in AR as the new leader and set all live brokers in AR as the new ISR.
1.2 Write the new leader and ISR in /brokers/topics/[topic]/[partition_id|partition_id]/leaderAndISR
2. Send the LeaderAndISRCommand to the affected brokers. Again, for efficiency, the controller can send multiple commands in 1 RPC partition P in set_p
2.1 If the command asks this broker to be the new leader for P and this broker is not already the leader for P,
2.1.1 Stop the fetcher to the current leader
2.1.2 Become the leader and remembers the ZK version of the LeaderAndISR path
2.2 If the command asks this broker to following a leader L and the broker is not already following L
2.2.1 stop the fetcher to the current leader
2.2.2 become a follower to L
3. If the command has a flag INIT, delete all local partitions not in set_p.

For StopReplicaCommand, it calls on_StopReplicaCommand().
on_StopReplicaCommand(command):
1. Read the list of partitions from command.
2. For each such partition P
2.1 delete p from local storage, if present.
D. Handling controller failure.

...

Code Block
on_controller_failover():
1. create /controller -> {this broker id)
2. if not successful, return
3. read the LeaderAndISR path from ZK for each partition
4. send a LeaderAndISR command (with a special flag INIT) for each partition to relevant brokers. Those commands can be sent in 1 RPC request.
5. call on_broker_change()
6. for the list of partitions without a leader, call init_leaders().

7. call on_partitions_reassigned()

...

A broker can be down for a long time during which a topic can be deleted and recreated. Do we need partition version id in order to clean up an outdated partition from local storage? The way our replica assignment works is that we only assign replicas to live brokers. This means that while a broker is down, no new replicas will be assigned to it. So, it seems that we won't get into the situation that when a failed broker comes back, a partition is still assigned to this broker and yet the partition stored locally is outdated. So, we don't really need partition version id to address this issue. On broker startup, the broker will read all local partition directories, delete each directory whose partition is no longer assigned to itself, and then load the last segment of each of the remaining directories.

6. What happens to client routing during a broker failover?

In this proposal, the controller first publishes the new leader for affected partitions in the LeaderAndISR path in ZK, then sends the leadership change commands to the brokers. The brokers then act on those leadership change commands. Since we use the LeaderAndISR path to route the client request, there is a window (potentially small) that a client is routed to a broker that's the new leader, but the broker is not ready to be the new leader yet.

For reference, HBase only updates the metadata (for client routing) after the regionserver responds to close/open region commands. So, one would think that instead of the controller directly updating the LeaderAndISR path, we can let each broker update that path after it completes the execution of the command. There is actually a critical reason that the leaderAndISR path has to be updated by the controller. This is because we rely on the leaderAndISR path in ZK to synchronize between the controller and the leader of a partition. After the controller makes a leadership change decision, it doesn't want the ISR to be changed by the current leader any more. Otherwise, the newly elected leader could be taken out of ISR by the current leader before the new leader takes over the leadership. By publishing the new leader immediately in the leaderAndISR path, the controller prevents the current leader from updating the ISR any more.
One possibility is to use another ZK path ExternalView for client routing. The controller only updates ExternalView after the broker responds positively for the leadership change command. There is a tradeoff between using 1 ExternalView path for all partitions or 1 ExternalView path per partition. The former has less ZK overhead, but potentially forces unnecessary rebalancing on the consumers.Aonther way to think about this is that in the normal case, leadership change commands are executed very quickly. So we probably can just rely on client side retry logic to handle the transition period. In the uncommon case that somehow it takes too long for a broker to become a leader (likely due to a bug), the controller will get a timeout and can trigger an alert so that an admin can take a look at it. So, we probably can start without the ExternalView path and reconsider it if it's really needed.

...