Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
leaderElection(r: Replica)
  read the current ISR and AR for r.partition.partition_id from ZK
  if( (r in AR) && (ISR is empty || r in ISR) ) 
  {
     wait for PreferredReplicaTime if r is not the preferred replica
     if(successfully write r as the current leader of r.partition in ZK)
       becomeLeader(r, ISR, CUR)
     else
       becomeFollower(r)
  }
} 

State change events

On every broker

...

Code Block
becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
{
   // get a new epoch value and write it to the leader path
   epoch = getNewEpoch()
   /brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=broker_id, epoch
   stop HW checkpoint thread for r
   r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r
   wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed
   r.partition.ISR = the current set of replicas in sync with r
   r.partition.CUR = AR - ISR
   write r.partition.ISR & r.partition.CUR in ZK
   r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
   r.partition.leader = r                  // this enables reads/writes to this partition on this broker
   start a commit thread on r.partition
   start HW checkpoint thread for r
    get replica info from ZK and compute AR, a list of replicas assigned to this broker
    for each r in AR {
       if(r.broker_id is registered under /brokers/ids)
          sendStateChange(“start-replica”, r.broker_id, epoch)
    }
}

Admin commands

This section describes the algorithms for various admin commands like create/delete topic, add/remove partition.

...