THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
Replica { // a replica of a partition
broker_id : int
partition : Partition
log : Log // local log associated with this replica
hw : long // offset of the last committed message
leo : long // log end offset
isLeader : Boolean // is this replica leader
}
Partition { //a partition in a topic
topic : string
partition_id : int
leader : Replica // the leader replica of this partition
ISR : Set[Replica] // In-sync replica set, maintained at the leader
AR : Set[Replica] // All replicas assigned for this partition
LeaderAndISRVersionInZK : long // version id of the LeaderAndISR path; used for conditionally update the LeaderAndISR path in ZK
}
LeaderAndISRCommand { isInit : byte // whether this is the first command issued by a controller leaderAndISRMap : Map[(topic: String, partitionId: int) => LeaderAndISR) // a map of LeaderAndISR}
LeaderAndISR { leader : int // broker id of the leader leaderGenId : int // leader generation id, incremented on each leadership change ISR : Set[int] // a set of the id of each broker in ISR zkVersion : long // version of the LeaderAndISR path in ZK }
LeaderAndISRResponse { responseMap : Map[(topic: String, partitionId: int) => int) // a map of error code}
StopReplicaCommand {
stopReplicaSet : Set[(topic: String, partitionId: int)) // a set of partitions to be stopped
}
StopReplicaResponse { responseMap : Map[(topic: String, partitionId: int) => int) // a map of error code}
|
A. Failover during broker failure.
...
Code Block |
---|
on_broker_change(): 1. Get the current live brokers from BrokerPath in ZK 2. Determine set_p, a set of partitions whose leader is no longer live or whose ISR will change because a broker is down. 3. For each partition P in set_p 3.1 Read the current ISR of P from LeaderAndISR path in ZK 3.2 Determine the new leader and the new ISR of P: If ISR has at least 1 broker in the live broker list, select one of those brokers as the new leader. The new ISR includes all brokers in the current ISR that are alive. Otherwise, select one of the live brokers in AR as the new leader and set that broker as the new ISR (potential data loss in this case). Finally, if none of the brokers in AR is alive, set the new leader to -1. 3.3 Write the new leader, ISR and a new epoc (increase current epoc by 1) in /brokers/topics/[topic]/[partition_id|partition_id]/leaderAndISR. This write has to be done conditionally. If the version of the LeaderAndISR path has changed btw 1.1 and 1.3, go back to 1.1. 4. Send a LeaderAndISRCommand (containers the new leader/ISR and the ZK version of the LeaderAndISR path) for each partition in set_p to the affected brokers. For efficiency, we can put multiple commands in one RPC request. (Ideally we want to use ZK multi to do the reads and writes in step 3.1 and 3.3.) |
...