Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3


Paths stored in Zookeeper

Wiki MarkupNotation: When an element in a path is denoted \ [xyz\], that means that the value of xyz is not fixed and there is in fact a znode for each possible value of xyz. For example /topics/\[topic\] would be a directory named /topics containing a directory for each topic name. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world". A path is persistent unless it’s marked as ephemeral.

We store the following paths in Zookeeper:


  1. Partition-reassigned listener:
    1. child change on /brokers/partitions_reassignedunmigrated-wiki-markup
    2. child change on /brokers/partitions_reassigned/\[topic\]

Zookeeper listeners on all brokers

    1. Wiki MarkupLeader-change listener: value change on /brokers/topics/\[topic\]/\[partition_id\]/leaderunmigrated-wiki-markup
    2. State-change listener: child change on /brokers/state/\[broker_id\]

Configuration parameters

    1. LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
    2. KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.


Code Block
  if(this broker is the leader for [partition_id])
  	   p.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id]
     AR = /brokers/topics/[topic]/[partition_id]/replicas
     newReplicas = p.RAR - AR
     for(newReplica <- newReplicas)
       sendStateChange(“start-replica”, newReplica.broker_id, epoch)

State change communication

The leader uses this API to communicate a state change request to the followers

     if(p.RAR is empty) 
        for(assignedReplica <- AR)
           sendStateChange("close-replica", assignedReplica.broker_id, epoch)

State change communication

The leader uses this API to communicate a state change request to the followers

Code Block

sendStateChange(stateChange, followerBrokerId, leaderEpoch)
Code Block

sendStateChange(stateChange, followerBrokerId, leaderEpoch)
   stateChangeQ = new StateChangeQueue(“/brokers/state/followerBrokerId”)
   stateChangeRequest = new StateChangeRequest(stateChange, leaderEpoch)
   // check if the state change Q is full. This can happen if a broker is offline for a long time
   if(stateChangeQ.isFull) {
      // this operation retains only one close-replica request for a partition, the one with the latest epoch. This is to ensure that an offline broker, on startup, will delete old topics and partitions, which it hosted before going offline. You don’t have to retain any start-replica requests for a partition
      // if the queue is still full, log an error
      throw new FollowerStateChangeQueueFull


Code Block
while(true) {
  pr = commitQ.dequeue
  canCommit = false
  while(!canCommit) {
    canCommit = true
    for each r in ISR
       if(!offsetReached(r, pr.offset)) {
	canCommit = false
   if(!canCommit) {
      write p.ISR to ZK
  for each c in CUR
      if(c.leo >= pr.offset) {
	p.ISR.add(c); p.CUR.delete(c); write p.ISR to ZK
      checkReassignedReplicas(pr, p.RAR, p.ISR)
       r.hw = pr.offset         // increment the HW to indicate that pr is committed
      send ACK to the client that pr is committed

offsetReached(r: Replica, offset: Long) {
   if(r.leo becomes equal or larger than offset within KeepInSyncTime)  return true
   return false

checkLoadBalancing() { // see if we need to switch the leader to the preferred replica
   if(leader replica is not the preferred one & the preferred replica is in ISR) {
       delete /brokers/topics/[topic]/[partition_id]/leader  in ZK
       stop this commit thread
       stop the HW checkpoint thread

checkReassignedReplicas(pr: ProduceRequest, RAR: Set[Replica], ISR: Set[Replica])

    // see if all reassigned replicas have fully caught up and older replicas have stopped fetching, if so, switch to those replicas

    // optimization, do the check periodically

    If (every replica in RAR has its leo >= pr.offset) {
       if(!sentCloseReplica.get) {
		oldReplicas = AR - RAR
          for(oldReplica <- oldReplicas) {
             if(r.broker_id != broker_id)
               sendStateChange(“close-replica”, oldReplica.broker_id, epoch)
       }else {
         // close replica is already sent. Wait until the replicas are closed or probably timeout and raise error
          if(broker_id is in (AR - RAR) && (other replicas in (AR - RAR) are not in ISR anymore)) {
            // leader is not in the reassigned replicas list
             completePartitionReassignment(RAR, ISR, AR, true)
        	else if(every replica in (AR-RAR) is not in ISR anymore) {
             completePartitionReassignment(RAR, ISR, AR, false)

completePartitionsReassignment(RAR: Set[Replica], ISR: Set[Replica], AR: Set[Replica], stopCommitThread: Boolean)
    //newly assigned replicas are in-sync, switch over to the new replicas
    //need (RAR + ISR) in case we fail right after here

    write (RAR + ISR) as the new ISR in ZK     
    update /brokers/topics/[topic]/[partition_id]/replicas in ZK with the new replicas in RAR

    if(stopCommitThread || (broker_id is not preferred replica))
    update /brokers/topics/[topic]/[partition_id]/replicas in ZK withif(this broker_id is not in the new replicas in RAR
//triggers leader election
         sendStateChange(“close-replica”, broker_id, epoch)
      delete /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
    if(stopCommitThread || (broker_id is not preferred replica))
    { //triggers leader election
      delete /brokers/topics/[topic]/[partition_id]/leader in ZK 
      stop this commit thread
