You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

The following is a draft design that uses a controller for leader election and other admin related tasks.

Major changes compared with the v2 proposal.
  • Leadership changes are now made by a controller.
  • The controller detects broker failures and elects a new leader for each affected partition.
  • Each leadership change is communicated by the controller to each affected broker.
  • The communication between the controller and the broker is done through direct RPC, instead of via Zookeeper.
Overview:

One of the brokers is elected as the controller for the whole cluster. It will be responsible for:

  1. Leadership change of a partition (each leader can independently update ISR)
  2. New topics; deleted topics
  3. Replica re-assignment

After the controller makes a decision, it publishes the decision permanently in ZK and also sends the new decisions to affected brokers through direct RPC. The published decisions are the source of truth and they are used by clients for request routing and by each broker during startup to recover its state. After the broker is started, it picks up new decisions made by the controller through RPC.

Potential benefits:

  1. Easier debugging since leadership changes are made in a central place.
  2. ZK reads/writes needed for leadership changes can be batched (also easier to exploit ZK multi) and thus reduce end-to-end latency during failover.
  3. Fewer ZK watchers.
  4. More efficient communication of state changes by using direct RPC, instead of via a queue implementation in Zookeeper.

Potential downside:

  1. Need controller failover.

Paths:

  1. Epoc path: stores the current epoc.
    /epoc --> {long} (generating a monotonically increasing number; used to identify leader generations) 
  2. Controller path: stores the current controller info.
    /controller --> {brokerid} (ephemeral; created by controller) 
  3. Broker path: stores the information of all live brokers.
    /brokers/ids/[broker_id] --> host:port (ephemeral; created by admin) 
  4. Topic path: stores the replication assignment for all partitions in a topic. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id
    /brokers/topics/[topic] --> {part1: [broker1, broker2], part2: [broker2, broker3] ...}  (created by admin) 
  5. LeaderAndISR path: stores leader and ISR of a partition
     /brokers/topics/[topic]/[partition_id]/leaderAndISR --> {leader_epoc: epoc, leader: broker_id, ISR: {broker1, broker2}}
     
     This path is updated by the controller or the current leader. The current leader only updates the ISR part.
     Updating the path requires synchronization using conditional updates to Zookeeper. 
  6. PartitionReassignment path: This path is used when we want to reassign some partitions to a different set of brokers. For each partition to be reassigned, it stores a list of new replicas and their corresponding assigned brokers. This path is created by an administrative process and is automatically removed once the partition has been moved successfully
     /brokers/partitions_reassigned/[topic]/[partition_id] --> {broker_id …} (created by admin) 


Terminologies:

AR: assigned replicas, ISR: in-sync replicas

A. Failover during broker failure.

Controller watches child changes of /brokers/ids path. When the watcher gets triggered, it calls on_broker_change().

on_broker_change():
1. Get the current live brokers from BrokerPath in ZK
2. Determine set_p, a set of partitions who leader is no longer live.
3. For each partition P in set_p
3.1 Read the current ISR of P from LeaderAndISR path in ZK
3.2 Determine the new leader and the new ISR of P:
    If ISR has at least 1 broker in the live broker list, select one of those brokers as the new leader. The new ISR includes all brokers in the current ISR that are alive.
    Otherwise, select one of the live brokers in AR as the new leader and set that broker as the new ISR (potential data loss in this case).
    Finally, if none of the brokers in AR is alive, set the new leader to -1.
3.3 Write the new leader, ISR and a new epoc (increase current epoc by 1) in /brokers/topics/[topic]/[partition_id]/leaderAndISR.
    This write has to be done conditionally. If the version of the LeaderAndISR path has changed btw 1.1 and 1.3, go back to 1.1.
4. Send a LeaderAndISRCommand (containers the new leader/ISR and the ZK version of the LeaderAndISR path) for each partition in set_p to the affected brokers.
   For efficiency, we can put multiple commands in one RPC request.
(Ideally we want to use ZK multi to do the reads and writes in step 3.1 and 3.3.)
B. Broker acts on commands from the controller.

Each broker listens to commands from the controller through RPC. 

For LeaderAndISRCommand, it calls on_LeaderAndISRCommand().
on_LeaderAndISRCommand(command):
1. Read the set of partitions set_P from command.
2. For each partition P in set_p
2.1 If the command asks this broker to be the new leader for P and this broker is not already the leader for P,
2.1.1 Stop the fetcher to the current leader
2.1.2 Become the leader and remembers the ZK version of the LeaderAndISR path
2.2 If the command asks this broker to following a leader L and the broker is not already following L
2.2.1 stop the fetcher to the current leader
2.2.2 become a follower to L

3. If the command has a flag INIT, delete all local partitions not in set_p.


For StopReplicaCommand, it calls on_StopReplicaCommand().
on_StopReplicaCommand(command):
1. Read the list of partitions from command.
2. For each such partition P
2.1 delete p from local storage, if present.
C. Creating/deleting topics.

The controller watches child change of /brokers/topics. When the watcher gets triggered, it calls on_topic_change().

on_topic_change():
The controller keeps in memory a list of existing topics.
1. If a new topic is created, read the TopicPath in ZK to get topic's replica assignment.
1.1. call init_leaders() on all newly created partitions.
2. If a topic is deleted, send the StopReplicaCommand to all affected brokers.

init_leaders(set_p):
Input: set_p, a set of partitions
0. Read the current live broker list from the BrokerPath in ZK
1. For each partition P in set_p
1.1 Select one of the live brokers in AR as the new leader and set all live brokers in AR as the new ISR.
1.2 Write the new leader and ISR in /brokers/topics/[topic]/[partition_id|partition_id]/leaderAndISR
2. Send the LeaderAndISRCommand to the affected brokers. Again, for efficiency, the controller can send multiple commands in 1 RPC.
D. Handling controller failure.

Each broker sets an exists watcher on the ControllerPath. When the watcher gets triggered, it calls on_controller_failover(). Basically, the controller needs to inform all brokers the current states stored in ZK (since some state change commands could be lost during the controller failover).

on_controller_failover():
1. create /controller -> {this broker id)
2. if not successful, return
3. read the LeaderAndISR path from ZK for each partition
4. send a LeaderAndISR command (with a special flag INIT) for each partition to relevant brokers. Those commands can be sent in 1 RPC request.
5. call on_broker_change()
6. for the list of partitions without a leader, call init_leaders().
E. Broker startup.

When a broker starts up, it calls on_broker_startup(). Basically, the broker needs to first read the current state of each partition from ZK.

on_broker_startup():
1. read the replica assignment of all topics from the TopicPath in ZK
2. read the leader and the ISR of each partition assigned to this broker from the LeaderAndISR path in ZK
3. for each replica assigned to this broker
3.1 start replica
3.2 if this broker is a leader of this partition, become leader. (shouldn't happen in general)
3.3 if this broker is a follower of this partition, become follower.
4. Delete local partitions no longer assigned to this broker (partitions deleted while the broker is down).
5. subscribes to changes in ZKQueue for this broker.
F. Replica reassignment:

Controller watches child changes in /brokers/partitions_reassigned/[topic]. When the watcher gets triggered, it calls on_partitions_reassigned().

on_partitions_reassigned():
1. read /brokers/partitions_reassigned/[topic]
2. issue StartReplica command to the right brokers.
3. periodically check ISR of affected partitions
3.1 if ISR == AR+RAR, update ISR, and send StartReplica (to inform the leader of the new ISR) and StopReplica command to the right brokers.
3.2 update /brokers/topics/[topic] to change AR to the new replica set
3.3 delete /brokers/partitions_reassigned/[topic]
(An alternative approach to 3 is to set watches on ISR and do the check only when ISR is changed.)
4. inform the current leader of the ISR change by write ISRState change in ZKQueue
Discussions:
1. End-to-end latency during a broker failure:
  1. broker shutdown (after closing socket server, need to close request handler, close log)
  2. broker watcher gets triggered in controller
  3. make leadership change and publish the new leader/ISR in ZK (1 ZK write per affected partition)
  4. inform the leadership change to each broker by write to ZKQueue (1 ZK write per broker)
  5. leader waits for followers in ISR to connect (Kafka PRC)
  6. follower truncates its log first (a potential I/O) and then starts fetching from leader

In the critical path, the most time consuming operation is step 3 where we need to write 1 ZK path per partition. Assuming that during a broker failover we need to change leader for 10K partitions and each ZK write takes 4ms, this could take 40 secs. One possibility is to use the multi() support in ZK 3.4 to batch those writes in 1 ZK operation.

2. ZKQueue:

Communicating between the controller and the brokers via ZK is not efficient. Each communication requires 2 ZK writes (each costs roughly 2 RPC), 1 watcher firing and 1 ZK read. These add up to roughly 6 RPCs per communication. An alternative is to implement an admin RPC in the broker for direct communication between the controller and the brokers. Then each communication costs only 1 RPC. The admin RPC could specify a timeout, during which it expects the admin command to be completed.

3. Dealing with multiple leaders in transition:

Occasionally, it's possible for multiple brokers to simultaneous assume that they are the leader of a partition. For example, broker A is the initial leader of a partition and the ISR of that partition is {A,B,C}.. Then, broker A goes into GC and losses its ZK registration. The controller assumes that broker A is dead, assigns the leader of the partition to broker B and sets the new ISR in ZK to {B,C}. Broker B becomes the leader and at the same time, Broker A wakes up from GC but hasn't acted on the leadership change command sent by the controller. Now, both broker A and B think they are the leader. It would be bad if we allow both broker A and B to commit new messages since the data among replicas will be out of sync. Our current design actually will prevent this from happening in this situation. Here is why. The claim is that after broker B becomes the new leader, broker A can no longer commit new messages any more. For broker A to commit a message m, it needs every replica in ISR to receive m. At the moment, broker A still thinks the ISR is {A,B,C} (its local copy; although the ISR in ZK has changed). Broker B will never receive message m. This is because by becoming the new leader, it must have first stopped fetching data from the previous leader. Therefore broker A can't commit message m without shrinking the ISR first. In order to shrink ISR, broker A has to write the new ISR in ZK. However, it can't do that because it will realize that the leaderAndISR node in ZK is not on a version that it assumes to be (since it has already been changed by the controller). At this moment, broker A will realize that it's no longer the leader any more.Question A.3, is broker down the only failure scenario that we worry about? Do we worry about leader failure at individual partition level?Question A.3, is broker down the only failure scenario that we worry about? Do we worry about leader failure at individual partition level?
How to deal with repeated topic deletion/creation? A broker can be down for a long time during which a topic can be deleted and recreated. When the broker comes up, the topic it has locally may not match the content of the newly created topic. There are a couple of ways of dealing with this.

  • No labels