Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
brokerStartup()
{
  create /brokers/state/[broker_id] path if it doesn’t already exist
  register the state change handler to listen on child change ZK notifications on /brokers/state/[broker_id]
  register session expiration listener
  drain the state change queue 
  get replica info from ZK and compute AR, a list of replicas assigned to this broker
  for each r in AR
  {
      subscribe to leader changes for the r’s partition
      startReplica(r)
  }
  // broker startup procedure is complete. Register is broker id in ZK to announce the availability of this broker
  register its broker_id in /brokers/ids/[broker_id] in ZK
}

State change events

On every broker

Leader change

This leader change listener is registered on every broker hosting a partition p. Each time it is triggered, the following procedure is executed -

Code Block

onLeaderChange() 
{
   if(broker_id is registered under /brokers/topics/[topic]/[partition_id]/replicas)
     leaderElection(r)
}

On State change

Each broker has a ZK path that it listens to for state change requests from the leader

Code Block

stateChangeListener() {  
// listens to state change requests issued by the leader and acts on those

    drain the state change queue
    read next state change request
    Let r be the replica that the state change request is sent for.
    // this should not happen
    Throw an error if r is not hosted on this broker                
    Let requestEpoch be the epoch of the state change request
    if(closeReplicaRequest) 
    { 
	 closeReplica(r)
      // we don’t need to check epoch here to be able to handle delete topic/delete partition for dead brokers.
    }
    if(startReplicaRequest) 
    { 
       Let latestPartitionEpoch be the latest epoch for this partition, got by reading /brokers/topics/[topic]/[partition]/leader=[broker_id],[epoch] 
       if(leader for r.partition doesn’t exist) {
	   // this can only happen for new topics or new partitions for existing topics
         startReplica(r)         
       }else if(requestEpoch == latestPartitionEpoch) {
         // this is to ensure that if a follower is slow, and reads a state change request queued up by a previous leader, it ignores the request
	    startReplica(r)
       }
    }
}

On the leader

On reassignment of partitions

Each time a partition reassigned event is triggered on the leader, it calls onPartitionReassigned()

Code Block

onPartitionsReassigned() 
{
  if(this broker is the leader for [partition_id])
  {
  	p.RAR = the new replicas from /brokers/partitions_reassigned/[topic]/[partition_id]
     AR = /brokers/topics/[topic]/[partition_id]/replicas
     newReplicas = p.RAR - AR
     for(newReplica <- newReplicas)
       sendStateChange(“start-replica”, newReplica.broker_id, epoch)
  }
}