Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

Paths stored in Zookeeper

Wiki MarkupNotation: When an element in a path is denoted \ [xyz\], that means that the value of xyz is not fixed and there is in fact a znode for each possible value of xyz. For example /topics/\[topic\] would be a directory named /topics containing a directory for each topic name. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world". A path is persistent unless it’s marked as ephemeral.

We store the following paths in Zookeeper:

...

  1. Partition-reassigned listener:
    1. child change on /brokers/partitions_reassignedunmigrated-wiki-markup
    2. child change on /brokers/partitions_reassigned/\[topic\]

Zookeeper listeners on all brokers

    1. Wiki MarkupLeader-change listener: value change on /brokers/topics/\[topic\]/\[partition_id\]/leaderunmigrated-wiki-markup
    2. State-change listener: child change on /brokers/state/\[broker_id\]

Configuration parameters

    1. LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
    2. KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.

...

Code Block
onPartitionsReassigned() 
{
  if(this broker is the leader for [partition_id])
  {
  	   p.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id]
     AR = /brokers/topics/[topic]/[partition_id]/replicas
     newReplicas = p.RAR - AR
     for(newReplica <- newReplicas)
       sendStateChange(“start-replica”, newReplica.broker_id, epoch)
  }
}

State change communication

The leader uses this API to communicate a state change request to the followers

     if(p.RAR is empty) 
     {
        for(assignedReplica <- AR)
           sendStateChange("close-replica", assignedReplica.broker_id, epoch)
     }
  }
}

State change communication

The leader uses this API to communicate a state change request to the followers

Code Block

sendStateChange(stateChange, followerBrokerId, leaderEpoch)
{
   stateChangeQ = new StateChangeQueue(“/brokers/state/followerBrokerId”)
   stateChangeRequest = new StateChangeRequest(stateChange, leaderEpoch)
   // check if the state change Q is full. This can happen if a broker is offline for a long time
   if(stateChangeQ.isFull) {
      // this operation retains only one close-replica request for a partition, the one with the latest epoch. This is 
Code Block

sendStateChange(stateChange, followerBrokerId, leaderEpoch)
{
   stateChangeQ = new StateChangeQueue(“/brokers/state/followerBrokerId”)
   stateChangeRequest = new StateChangeRequest(stateChange, leaderEpoch)
   // check if the state change Q is full. This can happen if a broker is offline for a long time
   if(stateChangeQ.isFull) {
      // this operation retains only one close-replica request for a partition, the one with the latest epoch. This is to ensure that an offline broker, on startup, will delete old topics and partitions, which it hosted before going offline. You don’t have to retain any start-replica requests for a partition
      stateChangeQ.shrink
      // if the queue is still full, log an error
      throw new FollowerStateChangeQueueFull
   }
   stateChangeQ.put(stateChangeRequest)
} 

...

Code Block
startReplica(r: Replica) {
  if(broker_id not in /brokers/topics/[r.topic]/[r.partition]/replicas)
    throw NotReplicaForPartitionException()
  if( r's log is not already started) {
     do local recovery of r's log
     r.hw = min(last checkpointed HW for r, r.leo)
     register a leader-change listener on partition r.partition.partition_id
  }
   if( a leader does not exist for partition r.partition.partition_id in ZK)
      leaderElection(r)
   else {
      if(//this broker is not the leader, then it is a follower since it is in the AR list for this partition)
      becomeFollower(if(this broker is not already the follower of the current leader)
         becomeFollower(r)
   }
}

Close replica

This state change is requested by the leader when a topic or partition is deleted or moved to another broker

...

Code Block
becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
{
   // get a new epoch value and write it to the leader path
   epoch = getNewEpoch()
   /brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=broker_id, epoch
   /brokers/topics/[r.partition.topic]/[r.partition.pid]/ISR=ISR;epoch
   stop HW checkpoint thread for r
   r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r
   wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed
   r.partition.ISR = the current set of replicas in sync with r
   r.partition.CUR = AR - ISR
   write r.partition.ISR & r.partition.CUR in ZK
   r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
   r.partition.leader = r                  // this enables reads/writes to this partition on this broker
   start a commit thread on r.partition
   start HW checkpoint thread for r
    get replica info from ZK and compute AR, a list of replicas assigned to this broker
}

Admin commands

This section describes the algorithms for various admin commands like create/delete topic, add/remove partition.

...

Code Block
createTopic(topic, numPartitions, replicationFactor, replicaAssignmentStr) 
{
   if(!cleanFailedTopicCreationAttempt(topic))
   {
      error(“Topic topic exists with live partitions”)
      exit
   }  
   if(replicaAssignmentStr == “”) {
     // assignReplicas will always assign partitions only to online brokers
     replicaAssignment = assignReplicas(topic, numPartitions, replicationFactor)
   }

   // create topic path in ZK
   create /brokers/topics/topic
   for(partition <- replicaAssignment) {
     addPartition(topic, partition.id, partition.replicas) 
   }
   // report successfully started partitions for this topic
}
waitTillStateChangeRequestConsumed(partition.replicas, timeout)
{
   register watch on state change path for each replica
   In the listener, use a condition variable to await(timeout). If it doesn’t fire return false, else return true
}
{
   if(!cleanFailedTopicCreationAttempt(cleanFailedTopicCreationAttempts(topic)) 
{
   {
topicsForPartitionsReassignment = ls /brokers/partitions_reassigned
   errorfor(“Topictopic topic exists with live partitions”)<- topicsForPartitionsReassignment)
   {
      exit
partitionsCreated = ls  }/brokers/partitions_reassigned/topic
   
   if(replicaAssignmentStrcleanupFailed == “”)false
 {
     // assignReplicas will always assign partitions only to online brokers
for(partition <- partitionsCreated) 
      {
        replicaAssignment = assignReplicasif(topic, numPartitions, replicationFactor/brokers/topics/topic/partition/replicas path exists)
   }

   // create topic path in ZK{
   create /brokers/topics/topic
     for(partition <- replicaAssignment) {
  delete /brokers/partitions_reassigned/topic/partition
  addPartition(topic, partition.id, partition.replicas) 
   }
   // report successfully started partitions for this topic
}
waitTillStateChangeRequestConsumed(partition.replicas, timeout)
{error(“Cannot cleanup. Topic exists with live partition”) 
   register watch on state change path for each replica
  cleanupFailed In= thetrue
 listener, use a condition variable to await(timeout). If it doesn’t}
 fire return false, else return true
}

cleanFailedTopicCreationAttempts(topic      if(cleanupFailed) 
{
     topicsForPartitionsReassignment = ls if(/brokers/partitions_reassigned
   for(topic <- topicsForPartitionsReassignmentreassigned/topic has no children)
   {
      partitionsCreated = lsdelete /brokers/topicspartitions_reassigned/topic
      for(partition <- partitionsCreated)return false
      {
		if(/brokers/topics/topic/partition/replicas path exists)
  }
      // partition paths can be safely deleted
      for(partition <- {partitionsCreated)
      {
       error(“Cannot cleanup. Topic exists with live partition”)  read the /brokers/partitions_reassigned/topic/partition path 
         for each broker listed returnin falsethe 
above step, sendStateChange(“close-replica”, [broker_id], -1)
      }
      }
	 // partition paths can be safely deleted
delete /brokers/topics/topic/partitions/partition
         delete  for(partition <- partitionsCreated)
/brokers/partitions_reassigned/topic/partition
      }
   } 
  delete if(/brokers/topics/topic/partition
  has no children)
     delete /brokers/topics/topic
   } 
}

Delete topic

Code Block
deleteTopic(topic) 
{
   partitionsForTopic = ls /brokers/topics/topic
   for(partition <- partitionsForTopic) {
      if(!deletePartition(topic, partition))
      {
         error(“Failed to delete partition for topic”)
         exit 
      }
   }
   // delete topic path in ZK
   delete /brokers/topics/topic
}

...

Code Block
while(true) {
  pr = commitQ.dequeue
  canCommit = false
  while(!canCommit) {
    canCommit = true
    for each r in ISR
       if(!offsetReached(r, pr.offset)) {
	canCommit = false
	break
       }
   if(!canCommit) {
      p.CUR.add(r)
      p.ISR.delete(r)
      write p.ISR to ZK
   }
  }
  for each c in CUR
      if(c.leo >= pr.offset) {
	p.ISR.add(c); p.CUR.delete(c); write p.ISR to ZK
      }
      checkReassignedReplicas(pr, p.RAR, p.ISR)
      checkLoadBalancing()
       r.hw = pr.offset         // increment the HW to indicate that pr is committed
      send ACK to the client that pr is committed
}

offsetReached(r: Replica, offset: Long) {
   if(r.leo becomes equal or larger than offset within KeepInSyncTime)  return true
   return false
}

checkLoadBalancing() { // see if we need to switch the leader to the preferred replica
   if(leader replica is not the preferred one & the preferred replica is in ISR) {
       delete /brokers/topics/[topic]/[partition_id]/leader  in ZK
       stop this commit thread
       stop the HW checkpoint thread
   }
 }

checkReassignedReplicas(pr: ProduceRequest, RAR: Set[Replica], ISR: Set[Replica])

{
    // see if all reassigned replicas have fully caught up and older replicas have stopped fetching, if so, switch to those replicas

    // optimization, do the check periodically

    If (every replica in RAR has its leo >= pr.offset) {
       if(!sentCloseReplica.get) {
		oldReplicas = AR - RAR
          for(oldReplica <- oldReplicas) {
             if(r.broker_id != broker_id)
               sendStateChange(“close-replica”, oldReplica.broker_id, epoch)
          }
          sentCloseReplica.set(true)
       }else {
         // close replica is already sent. Wait until the replicas are closed or probably timeout and raise error
          if(broker_id is in (AR - RAR) && (other replicas in (AR - RAR) are not in ISR anymore)) {
            // leader is not in the reassigned replicas list
             completePartitionReassignment(RAR, ISR, AR, true)
	        sentCloseReplica.set(false)            
          }
        	else if(every replica in (AR-RAR) is not in ISR anymore) {
             completePartitionReassignment(RAR, ISR, AR, false)
	        sentCloseReplica.set(false)
	     }
       }
         
}

completePartitionsReassignment(RAR: Set[Replica], ISR: Set[Replica], AR: Set[Replica], stopCommitThread: Boolean)
{
    //newly assigned replicas are in-sync, switch over to the new replicas
    //need (RAR + ISR) in case we fail right after here

    write (RAR + ISR) as the new ISR in ZK     
    update /brokers/topics/[topic]/[partition_id]/replicas in ZK with the new replicas in RAR

    if(stopCommitThread || (broker_id is not preferred replica))
    {
  
    update /brokers/topics/[topic]/[partition_id]/replicas in ZK withif(this broker_id is not in the new replicas in RAR
//triggers leader election
 AR)
         sendStateChange(“close-replica”, broker_id, epoch)
      delete /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
    if(stopCommitThread || (broker_id is not preferred replica))
    { //triggers leader election
      delete /brokers/topics/[topic]/[partition_id]/leader in ZK 
      stop this commit thread
    }
}

...