Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

end-to-end latency during a broker failure:
1. broker shutdown (after closing socket server, need to close request handler, close log)
2. broker watcher gets triggered (ZK RPC)
3. make leadership change and write new ISR (ZK reads/writes)
4. wait for followers in ISR to connect (Kafka PRC)
4.1 follower will need to truncate log first (I/O)

The following is a draft design that uses a controller for leader election and other admin related tasks.

...

Code Block
on_broker_change():
The controller keeps in-memory for every partition: leader, AR
1. call change_leaders() on the current list of partitions

change_leaders():
Input: a list of partitions and their leader, AR
1. Read the current live broker list
2. Determine the set of partitions whose leader is not in the live broker list.
3. For each such partition P
3.1 Read the current ISR of P from ZK
3.2 Determine the new leader and the new ISR of P:
    If ISR has at least 1 broker in the live broker list, select one of those brokers as the new leader. The new ISR includes all brokers in the current ISR that are alive.
    Otherwise, select one of the live brokers in AR as the new leader and set that broker as the new ISR (potential data loss in this case).
    Question A.1, what happens if none of the brokers in AR is alive?
4. For each such partition p, write the new leader and ISR in /brokers/topics/[topic]/[partition_id]/leaderAndISR
5. Publish the new leader/ISR for each affected partition to the ZKQueue of the affected brokers. For efficiency, the controller can write the decision for all affected partitions in 1 path in ZKQueue.
(Ideally we want to use ZK multi to conditionally write 3 and 4 in 1 transaction for better latency and correctness.)
Question A.2, should the broker publish decisions to the brokers that are currently down? If not, how do we guarantee that no decision changes are lost?
Question A.3, is broker down the only failure scenario that we worry about? Do we worry about leader failure at individual partition level?
B. Broker acts on leadership change.

Each broker registers a child watcher on its ZKQueue. When the watcher gets triggered, it calls on_leader_assignment_change().

Code Block
on_leader_assignment_change():
1. Read from this broker's ZKQueue, the list of partitions whose leader/ISR has changed.
2. For each such partition P
2.1 If this broker is the new leader,
2.1.1 stop the fetcher to the current leader
2.1.2 become the leader (This is critical: the leader can only update ISR in the future if it hasn't been changed by the controller)
2.2 If this broker is following a new leader
2.2.1 stop the fetcher to the current leader
2.2.2 become a follower
C. Creating/deleting topics.

Controller watches child change of /brokers/topics. When the watcher gets triggered, it calls on_topic_change().

...

(Ideally we want to use ZK multi to conditionally write 2 and 3 in 1 transaction for better latency and correctness.)

D. Handling controller failure.

Each broker sets an exists watch on /controller (ephemeral). When the watcher gets triggered, it calls on_controller_failover(). Basically, the controller needs to inform each of the brokers all decisions that it has made in the history (since it's not sure if there is any decision lost during the controller failover). A broker can ignore decisions that it has followed already.

Code Block
on_controller_failover():
1. create /controller \-> this broker id
2. if successful
2.1 write all published decisions (leader/ISR for each partition) to ZKQueue to all brokers.
2.2 change_leaders()
2.3 for the list of partitions without a leader, call init_leaders().
E. Broker startup.

When a broker starts up, it calls on_broker_startup(). Basically, the broker needs to first read all published decisions about each partition.

Code Block
on_broker_startup():
1. read all /brokers/topics/[topic]
2. read /broker/leader_assignment
3. for each replica assigned to this broker
3.1 start replica
3.2 if this broker is a leader of this partition, become leader. (shouldn't happen in general)
3.3 if this broker is a follower of this partition, become follower.
4. subscribes to changes in ZKQueue for this broker.
F. Replica reassignment:

Wiki Markup
Controller watches child changes in /brokers/partitions_reassigned/\[topic\]. When the watcher gets triggered, it calls on_partitions_reassigned().

Code Block
on_partitions_reassigned():
1. read /brokers/partitions_reassigned/[topic]
2. issue StartReplica command to the right brokers.
3. periodically check ISR of affected partitions
3.1 if ISR == AR+RAR, update ISR (??? need to do that in individual ISR and leader path), and send StopReplica command to the right brokers.
3.2 update /brokers/topics/[topic] to change AR to the new replica set
3.3 delete /brokers/partitions_reassigned/[topic]
(An alternative approach to 3 is to set watches on ISR and do the check only when ISR is changed.)
4. inform the current leader of the ISR change by write ISRState change in ZKQueue
End-to-end latency during a broker failure:
  1. broker shutdown (after closing socket server, need to close request handler, close log)
  2. broker watcher gets triggered in controller
  3. make leadership change and publish the new leader/ISR in ZK (1 ZK write per affected partition)
  4. inform the leadership change to each broker by write to ZKQueue (1 ZK write per broker)
  5. leader waits for followers in ISR to connect (Kafka PRC)
  6. follower truncates its log first (a potential I/O) and then starts fetching from leader

In the critical path, the most time consuming operation is step 3 where we need to write 1 ZK path per partition. Assuming that during a broker failover we need to change leader for 10K partitions and each ZK write takes 2ms, this could take 20 secs. One possibility is to use the multi support in ZK