Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
brokerStartup()
{
  create /brokers/state/[broker_id] path if it doesn’t already exist
  register the state change handler to listen on child change ZK notifications on /brokers/state/[broker_id]
  register session expiration listener
  drain the state change queue 
  get replica info from ZK and compute AR, a list of replicas assigned to this broker
  for each r in AR
  {
      subscribe to leader changes for the r’s partition
      startReplica(r)
  }
  // broker startup procedure is complete. Register is broker id in ZK to announce the availability of this broker
  register its broker_id in /brokers/ids/[broker_id] in ZK
}

State change events

On every broker

Leader change

...

Leader election

Code Block
onLeaderChangeleaderElection(r: Replica)
 
{
 read  if(broker_id is registered under /brokers/topics/[topic]/[partition_id]/replicas)
     leaderElection(r)
}

On State change

Each broker has a ZK path that it listens to for state change requests from the leader

the current ISR and AR for r.partition.partition_id from ZK
  if( (r in AR) && (ISR is empty || r in ISR) ) 
  {
     wait for PreferredReplicaTime if r is not the preferred replica
     if(successfully write r as the current leader of r.partition in ZK)
       becomeLeader(r, ISR, CUR)
  }
} 

State change events

On every broker

Leader change

This leader change listener is registered on every broker hosting a partition p. Each time it is triggered, the following procedure is executed -

Code Block

onLeaderChange() 
{
   if(broker_id is registered under /brokers/topics/[topic]/[partition_id]/replicas)
     leaderElection(r)
}

On State change

Each broker has a ZK path that it listens to for state change requests from the leader

Code Block

stateChangeListener() {  
// listens to state change requests issued by the leader and acts on those

    drain the state change queue
    read next state change request
    Let r be the replica that the state change request is sent for.
    // this should not happen
    Throw an error if r is not hosted on this broker                
    Let requestEpoch be the epoch of the state change request
    if(closeReplicaRequest) 
    { 
	 closeReplica(r)
      // we don’t need to check epoch here to be able to handle delete topic/delete partition for dead brokers.
    }
    if(startReplicaRequest) 
    { 
       Let latestPartitionEpoch be the latest epoch for this partition, got by reading /brokers/topics/[topic]/[partition]/leader=[broker_id],[epoch] 
       if(leader for r.partition doesn’t exist) {
	   // this can only happen for new topics or new partitions for existing topics
         startReplica(r)         
       }else if(requestEpoch == latestPartitionEpoch) {
         // this is to ensure that if a follower is slow, and reads a state change request queued up by a previous leader, it ignores the request
	    startReplica(r)
       }
    }
}

On the leader

On reassignment of partitions

Each time a partition reassigned event is triggered on the leader, it calls onPartitionReassigned()

Code Block

onPartitionsReassigned() 
{
  if(this broker is the leader for [partition_id])
  {
  	p.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id]
     AR = /brokers/topics/[topic]/[partition_id]/replicas
     newReplicas = p.RAR - AR
     for(newReplica <- newReplicas)
       sendStateChange(“start-replica”, newReplica.broker_id, epoch)
  }
}

State change communication

The leader uses this API to communicate a state change request to the followers

Code Block

sendStateChange(stateChange, followerBrokerId, leaderEpoch)
{
   stateChangeQ = new StateChangeQueue(“/brokers/state/followerBrokerId”)
   stateChangeRequest = new StateChangeRequest(stateChange, leaderEpoch)
   // check if the state change Q is full. This can happen if a broker is offline for a long time
   if(stateChangeQ.isFull) {
      // this operation retains only one close-replica request for a partition, the one with the latest epoch. This is to ensure that an offline broker, on startup, will delete old topics and partitions, which it hosted before going offline. You don’t have to retain any start-replica requests for a partition
      stateChangeQ.shrink
      // if the queue is still full, log an error
      throw new FollowerStateChangeQueueFull
   }
   stateChangeQ.put(stateChangeRequest)
} 

State change operations

Start replica

This state change is requested by the leader or the admin command for a new replica assignment

Code Block

startReplica(r: Replica) {
  if(broker_id not in /brokers/topics/[r.topic]/[r.partition]/replicas)
    throw NotReplicaForPartitionException()
  if( r's log is not already started) {
     do local recovery of r's log
     r.hw = min(last checkpointed HW for r, r.leo)
     register a leader-change listener on partition r.partition.partition_id
  }
   if( a leader does not exist for partition r.partition.partition_id in ZK)
      leaderElection(r)
   else {
     if(this broker is not the leader, then it is a follower since it is in the AR list for this partition)
      becomeFollower(r)
   }
}

Close replica

This state change is requested by the leader when a topic or partition is deleted or moved to another broker

Code Block

closeReplica(r: Replica)
{
   stop the fetcher associated with r, if one exists
   close and delete r
}

Become follower

This state change is requested by the leader when the leader for a replica changes

Code Block

becomeFollower(r: Replica)
{
  // this is required if this replica was the last leader
  stop the commit thread, if any  
Code Block

stateChangeListener() {  
// listens to state change requests issued by the leader and acts on those

    drain the state change queue
    read next state change request
    Let r be the replica that the state change request is sent for.
    // this should not happen
    Throw an error if r is not hosted on this broker                
    Let requestEpoch be stop the epochcurrent of the state change request
    if(closeReplicaRequest) ReplicaFetcherThread, if any
  truncate the {log 
	to closeReplica(r).hw
  start a new ReplicaFetcherThread //to wethe don’tcurrent needleader toof checkr, epochfrom here to be able to handle delete topic/delete partition for dead brokers.
    }
    if(startReplicaRequest) 
    { 
       Let latestPartitionEpoch be the latest epoch for this partition, got by readingoffset r.leo
  start HW checkpoint thread for r
}

Become leader

This state change is done by the new leader

Code Block

becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
{
   // get a new epoch value and write it to the leader path
   epoch = getNewEpoch()
   /brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=[broker_id],[ epoch] 
   stop HW checkpoint thread if(leader for r.partition
 doesn’t exist) {
	   r.hw = r.leo // TODO: check if this can only happenshould actually be r.hw = last checkpointed HW for newr
 topics or newwait partitionsuntil forevery existinglive topics
replica in AR catches up (i.e. its leo == startReplica(r.hw) or a KeepInSyncTime has passed
   r.partition.ISR = 
the current set of replicas in sync }else if(requestEpoch == latestPartitionEpoch) {
         // this is to ensure that if a follower is slow, and reads a state change request queued up by a previous leader, it ignores the request
	    startReplica(r)
       }
    }
}

On the leader

On reassignment of partitions

Each time a partition reassigned event is triggered on the leader, it calls onPartitionReassigned()

Code Block

onPartitionsReassigned() 
{
  if(this broker is the leader for [partition_id])
  {
  	p.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id]
     AR = /brokers/topics/[topic]/[partition_id]/replicas
     newReplicas = p.RAR - AR
     for(newReplica <- newReplicas)
with r
   r.partition.CUR = AR - ISR
   write r.partition.ISR & r.partition.CUR in ZK
   r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
   r.partition.leader = r                  // this enables reads/writes to this partition on this broker
   start a commit thread on r.partition
   start HW checkpoint thread for r
    get replica info from ZK and compute AR, a list of replicas assigned to this broker
    for each r in AR {
       if(r.broker_id is registered under /brokers/ids)
          sendStateChange(“start-replica”, newReplicar.broker_id, epoch)
    }
}

State change communication

The leader uses this API to communicate a state change request to the followers

Create topic

The admin commands does the following while creating a new topic

Code Block

createTopic(topic, numPartitions, replicationFactor, replicaAssignmentStr) 
{
   if(!cleanFailedTopicCreationAttempt(topic))
   {
      error(“Topic topic exists with live partitions”)
Code Block

sendStateChange(stateChange, followerBrokerId, leaderEpoch)
{
   stateChangeQ = new StateChangeQueue(“/brokers/state/followerBrokerId”)
   stateChangeRequest = new StateChangeRequest(stateChange, leaderEpoch)
   // check if the state change Q is full. This can happen if a broker is offline for a long time
   if(stateChangeQ.isFull) {
      //exit
 this operation retains} only one
 close-replica request for a partition, the one with the latest epoch. This is to ensure that an offline broker, on startup, will delete old topics and partitions, which it hosted before going offline. You don’t have to retain any start-replica requests for a partition
      stateChangeQ.shrink
      // if the queue is still full, log an error
      throw new FollowerStateChangeQueueFull
   }
   stateChangeQ.put(stateChangeRequest)
} 

State change operations

Start replica

This state change is requested by the leader or the admin command for a new replica assignment

Code Block

startReplica(r: Replica) {
  if(broker_id not in /brokers/topics/[r.topic]/[r.partition]/replicas)
    throw NotReplicaForPartitionException()
  if( r's log is not already started) {
     do local recovery of r's log
     r.hw = min(last checkpointed HW for r, r.leo)
     register a leader-change listener on partition r.partition.partition_id
  }
   if( a leader does not exist for partition r.partition.partition_id in ZK)
      leaderElection(r)
   else {
     if(this broker is not the leader, then it is a follower since it is in the AR list for this partition)
      becomeFollower(r)
   }
}

Close replica

This state change is requested by the leader when a topic or partition is deleted or moved to another broker

Code Block

closeReplica(r: Replica)
{
   stop the fetcher associated with r, if one exists
   close and delete r
}

Become follower

This state change is requested by the leader when the leader for a replica changes

Code Block

becomeFollower(r: Replica)
{
  // this is required if this replica was the last leader
  stop the commit thread, if any                  
  stop the current ReplicaFetcherThread, if any
  truncate the log to r.hw
  start a new ReplicaFetcherThread to the current leader of r, from offset r.leo
  start HW checkpoint thread for r
}

Become leader

This state change is done by the new leader

if(replicaAssignmentStr == “”) {
     // assignReplicas will always assign partitions only to online brokers
     replicaAssignment = assignReplicas(topic, numPartitions, replicationFactor)
   }

   // create topic path in ZK
   create /brokers/topics/topic
   for(partition <- replicaAssignment) {
     addPartition(topic, partition.id, partition.replicas) 
   }
   // report successfully started partitions for this topic
}
waitTillStateChangeRequestConsumed(partition.replicas, timeout)
{
   register watch on state change path for each replica
   In the listener, use a condition variable to await(timeout). If it doesn’t fire return false, else return true
}

cleanFailedTopicCreationAttempts(topic) 
{
   topicsForPartitionsReassignment = ls /brokers/partitions_reassigned
   for(topic <- topicsForPartitionsReassignment)
   {
      partitionsCreated = ls /brokers/topics/topic
      for(partition <- partitionsCreated) 
      {
		if(/brokers/topics/topic/partition/replicas path exists)
          {
             error(“Cannot cleanup. Topic exists with live partition”)
             return false 
          }
      }
	 // partition paths can be safely deleted
      for(partition <- partitionsCreated)
         delete /brokers/topics/topic/partition
      delete /brokers/topics/topic
   } 
}

Delete topic

Code Block

deleteTopic(topic) 
{
   partitionsForTopic = ls /brokers/topics/topic
   for(partition <- partitionsForTopic) {
      if(!deletePartition(topic, partition))
      {
         error(“Failed to delete partition for topic”)
         exit 
      }
   }
   // delete topic path in ZK
   delete /brokers/topics/topic
}

Add partition to existing topic

Code Block

addPartition(topic, partition, replicas) 
{
     // write the partitions reassigned path for this create topic command
     /brokers/partitions_reassigned/topic/partition=replicas
	// start replicas for this new partition
     for(replica <- replicas)
        sendStateChange(“start-replica”, replica.brokerId, -1)
     // wait till state change request is consumed by all replicas
     if(!waitTillStateChangeRequestConsumed(partition.replicas, timeout)) 
     {
        error(“Failed to create topic partition partitionId for timeout ms”)
        exit
     }
     // create partition paths in ZK
     /brokers/topics/topic/partitionId/replicas=replicas
     delete /brokers/partitions_reassigned/topic/partitionId
}

Remove partition for existing topic

Code Block

deletePartition(topic, partition) 
{
   // empty list for partition reassignment means delete partition
   /brokers/partitions_reassigned/topic/partition=””
   // wait till replica is closed by all replicas
   if(!waitTillStateChangeRequestConsumed(partition.replicas, timeout)) 
   {
     error(“Failed to delete topic after timeout ms”)
     return false
   }
   // create partition paths in ZK
   delete /brokers/topics/topic/partitionId
   delete /brokers/partitions_reassigned/topic/partitionId
Code Block

becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
{
   // get a new epoch value and write it to the leader path
   epoch = getNewEpoch()
   /brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=broker_id, epoch
   stop HW checkpoint thread for r
   r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r
   wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed
   r.partition.ISR = the current set of replicas in sync with r
   r.partition.CUR = AR - ISR
   write r.partition.ISR & r.partition.CUR in ZK
   r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
   r.partition.leader = r                  // this enables reads/writes to this partition on this broker
   start a commit thread on r.partition
   start HW checkpoint thread for r
    get replica info from ZK and compute AR, a list of replicas assigned to this broker
    for each r in AR {
       if(r.broker_id is registered under /brokers/ids)
          sendStateChange(“start-replica”, r.broker_id, epoch)
    }
}