Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
deletePartition(topic, partition) 
{
   // empty list for partition reassignment means delete partition
   /brokers/partitions_reassigned/topic/partition=””
   // wait till replica is closed by all replicas
   if(!waitTillStateChangeRequestConsumed(partition.replicas, timeout)) 
   {
     error(“Failed to delete topic after timeout ms”)
     return false
   }
   // create partition paths in ZK
   delete /brokers/topics/topic/partitionId
   delete /brokers/partitions_reassigned/topic/partitionId
}

Handling produce requests

Produce request handler on the leader

Code Block

produceRequestHandler(pr: ProduceRequest)
{
   if( the request partition pr.partition doesn't have leader replica on this broker)
      throw NotLeaderException
   log = r.partition.leader.log
   append pr.messages to log
   pr.offset = log.LEO
   add pr to pr.partition.commitQ
}

Message replication

Commit thread on the leader

Code Block

while(true) {
  pr = commitQ.dequeue
  canCommit = false
  while(!canCommit) {
    canCommit = true
    for each r in ISR
       if(!offsetReached(r, pr.offset)) {
	canCommit = false
	break
       }
   if(!canCommit) {
      p.CUR.add(r)
      p.ISR.delete(r)
      write p.ISR to ZK
   }
  }
  for each c in CUR
      if(c.leo >= pr.offset) {
	p.ISR.add(c); p.CUR.delete(c); write p.ISR to ZK
      }
      checkReassignedReplicas(pr, p.RAR, p.ISR)
      checkLoadBalancing()
       r.hw = pr.offset         // increment the HW to indicate that pr is committed
      send ACK to the client that pr is committed
}

offsetReached(r: Replica, offset: Long) {
   if(r.leo becomes equal or larger than offset within KeepInSyncTime)  return true
   return false
}

checkLoadBalancing() { // see if we need to switch the leader to the preferred replica
   if(leader replica is not the preferred one & the preferred replica is in ISR) {
       delete /brokers/topics/[topic]/[partition_id]/leader  in ZK
       stop this commit thread
       stop the HW checkpoint thread
   }
 }

checkReassignedReplicas(pr: ProduceRequest, RAR: Set[Replica], ISR: Set[Replica])

{
    // see if all reassigned replicas have fully caught up and older replicas have stopped fetching, if so, switch to those replicas

    // optimization, do the check periodically

    If (every replica in RAR has its leo >= pr.offset) {
       if(!sentCloseReplica.get) {
		oldReplicas = AR - RAR
          for(oldReplica <- oldReplicas) {
             if(r.broker_id != broker_id)
               sendStateChange(“close-replica”, oldReplica.broker_id, epoch)
          }
          sentCloseReplica.set(true)
       }else {
         // close replica is already sent. Wait until the replicas are closed or probably timeout and raise error
          if(broker_id is in (AR - RAR) && (other replicas in (AR - RAR) are not in ISR anymore)) {
            // leader is not in the reassigned replicas list
             completePartitionReassignment(RAR, ISR, AR, true)
	        sentCloseReplica.set(false)            
          }
        	else if(every replica in (AR-RAR) is not in ISR anymore) {
             completePartitionReassignment(RAR, ISR, AR, false)
	        sentCloseReplica.set(false)
	     }
       }
         
}

completePartitionsReassignment(RAR: Set[Replica], ISR: Set[Replica], AR: Set[Replica], stopCommitThread: Boolean)
{
    //newly assigned replicas are in-sync, switch over to the new replicas
    //need (RAR + ISR) in case we fail right after here

    write (RAR + ISR) as the new ISR in ZK     
    update /brokers/topics/[topic]/[partition_id]/replicas in ZK with the new replicas in RAR
//triggers leader election
    delete /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
    if(stopCommitThread || (broker_id is not preferred replica))
    {
      delete /brokers/topics/[topic]/[partition_id]/leader in ZK 
      stop this commit thread
    }
}