Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
onPartitionsReassigned() 
{
  if(this broker is the leader for [partition_id])
  {
  	p.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id]
     AR = /brokers/topics/[topic]/[partition_id]/replicas
     newReplicas = p.RAR - AR
     for(newReplica <- newReplicas)
       sendStateChange(“start-replica”, newReplica.broker_id, epoch)
  }
}

State change communication

The leader uses this API to communicate a state change request to the followers

Code Block

sendStateChange(stateChange, followerBrokerId, leaderEpoch)
{
   stateChangeQ = new StateChangeQueue(“/brokers/state/followerBrokerId”)
   stateChangeRequest = new StateChangeRequest(stateChange, leaderEpoch)
   // check if the state change Q is full. This can happen if a broker is offline for a long time
   if(stateChangeQ.isFull) {
      // this operation retains only one close-replica request for a partition, the one with the latest epoch. This is to ensure that an offline broker, on startup, will delete old topics and partitions, which it hosted before going offline. You don’t have to retain any start-replica requests for a partition
      stateChangeQ.shrink
      // if the queue is still full, log an error
      throw new FollowerStateChangeQueueFull
   }
   stateChangeQ.put(stateChangeRequest)
} 

State change operations

Start replica

This state change is requested by the leader or the admin command for a new replica assignment

Code Block

startReplica(r: Replica) {
  if(broker_id not in /brokers/topics/[r.topic]/[r.partition]/replicas)
    throw NotReplicaForPartitionException()
  if( r's log is not already started) {
     do local recovery of r's log
     r.hw = min(last checkpointed HW for r, r.leo)
     register a leader-change listener on partition r.partition.partition_id
  }
   if( a leader does not exist for partition r.partition.partition_id in ZK)
      leaderElection(r)
   else {
     if(this broker is not the leader, then it is a follower since it is in the AR list for this partition)
      becomeFollower(r)
   }
}

Close replica

This state change is requested by the leader when a topic or partition is deleted or moved to another broker

Code Block

closeReplica(r: Replica)
{
   stop the fetcher associated with r, if one exists
   close and delete r
}

Become follower

This state change is requested by the leader when the leader for a replica changes

Code Block

becomeFollower(r: Replica)
{
  // this is required if this replica was the last leader
  stop the commit thread, if any                  
  stop the current ReplicaFetcherThread, if any
  truncate the log to r.hw
  start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
  start HW checkpoint thread for r
}

Become leader

This state change is done by the new leader

Code Block

becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
{
   // get a new epoch value and write it to the leader path
   epoch = getNewEpoch()
   /brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=broker_id, epoch
   stop HW checkpoint thread for r
   r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r
   wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed
   r.partition.ISR = the current set of replicas in sync with r
   r.partition.CUR = AR - ISR
   write r.partition.ISR & r.partition.CUR in ZK
   r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
   r.partition.leader = r                  // this enables reads/writes to this partition on this broker
   start a commit thread on r.partition
   start HW checkpoint thread for r
    get replica info from ZK and compute AR, a list of replicas assigned to this broker
    for each r in AR {
       if(r.broker_id is registered under /brokers/ids)
          sendStateChange(“start-replica”, r.broker_id, epoch)
    }
}