Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
Replica {                                // a replica of a partition
  broker_id               : int
  partition               : Partition
  log                     : Log          // local log associated with this replica
  hw                      : long         // offset of the last committed message
  leo                     : long         // log end offset
  isLeader                : Boolean      // is this replica leader
}

Partition {                              //a partition in a topic
  topic                   : string
  partition_id            : int
  leader                  : Replica      // the leader replica of this partition
  ISR                     : Set[Replica] // In-sync replica set, maintained at the leader
  AR                      : Set[Replica] // All replicas assigned for this partition
  LeaderAndISRVersionInZK : long         // version id of the LeaderAndISR path; used for conditionally update the LeaderAndISR path in ZK
}



LeaderAndISRCommand {  isInit                  : byte         // whether this is the first command issued by a controller  leaderAndISRMap         : Map[(topic: String, partitionId: int) => LeaderAndISR) // a map of LeaderAndISR}
LeaderAndISR {  leader                  : int          // broker id of the leader  leaderGenId             : int          // leader generation id, incremented on each leadership change  ISR                     : Set[int]     // a set of the id of each broker in ISR  zkVersion               : long         // version of the LeaderAndISR path in ZK }
LeaderAndISRResponse {  responseMap             : Map[(topic: String, partitionId: int) => int) // a map of error code}


StopReplicaCommand {
  stopReplicaSet          : Set[(topic: String, partitionId: int)) // a set of partitions to be stopped
}


StopReplicaResponse {  responseMap             : Map[(topic: String, partitionId: int) => int) // a map of error code}
A. Failover during broker failure.

...

Code Block
on_broker_change():
1.  Get the current live brokers from BrokerPath in ZK
2. Determine set_p, a set of partitions whose leader is no longer live or whose ISR will change because a broker is down.
3. For each partition P in set_p
3.1 Read the current ISR of P from LeaderAndISR path in ZK
3.2 Determine the new leader and the new ISR of P:
If ISR has at least 1 broker in the live broker list, select one of those brokers as the new leader. The new ISR includes all brokers in the current ISR that are alive.
Otherwise, select one of the live brokers in AR as the new leader and set that broker as the new ISR (potential data loss in this case).
Finally, if none of the brokers in AR is alive, set the new leader to -1.
3.3 Write the new leader, ISR and a new epoc (increase current epoc by 1) in /brokers/topics/[topic]/[partition_id|partition_id]/leaderAndISR.
This write has to be done conditionally. If the version of the LeaderAndISR path has changed btw 1.1 and 1.3, go back to 1.1.
4. Send a LeaderAndISRCommand (containers the new leader/ISR and the ZK version of the LeaderAndISR path) for each partition in set_p to the affected brokers.
For efficiency, we can put multiple commands in one RPC request.
(Ideally we want to use ZK multi to do the reads and writes in step 3.1 and 3.3.)

...