Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
{
   // get a new epoch value and write it to the leader path
   epoch = getNewEpoch()
   /brokers/topics/[r.partition.topic]/[r.partition.pid]/leader=broker_id, epoch
   stop HW checkpoint thread for r
   r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r
   wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed
   r.partition.ISR = the current set of replicas in sync with r
   r.partition.CUR = AR - ISR
   write r.partition.ISR & r.partition.CUR in ZK
   r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
   r.partition.leader = r                  // this enables reads/writes to this partition on this broker
   start a commit thread on r.partition
   start HW checkpoint thread for r
    get replica info from ZK and compute AR, a list of replicas assigned to this broker
    for each r in AR {
       if(r.broker_id is registered under /brokers/ids)
          sendStateChange(“start-replica”, r.broker_id, epoch)
    }
}

Admin commands

This section describes the algorithms for various admin commands like create/delete topic, add/remove partition.

Create topic

The admin commands does the following while creating a new topic

Code Block
createTopic(topic, numPartitions, replicationFactor, replicaAssignmentStr) 
{
   if(!cleanFailedTopicCreationAttempt(topic))
   {
      error(“Topic topic exists with live partitions”)
      exit
   }  
   if(replicaAssignmentStr == “”) {
     // assignReplicas will always assign partitions only to online brokers
     replicaAssignment = assignReplicas(topic, numPartitions, replicationFactor)
   }

   // create topic path in ZK
   create /brokers/topics/topic
   for(partition <- replicaAssignment) {
     addPartition(topic, partition.id, partition.replicas) 
   }
   // report successfully started partitions for this topic
}
waitTillStateChangeRequestConsumed(partition.replicas, timeout)
{
   register watch on state change path for each replica
   In the listener, use a condition variable to await(timeout). If it doesn’t fire return false, else return true
}

cleanFailedTopicCreationAttempts(topic) 
{
   topicsForPartitionsReassignment = ls /brokers/partitions_reassigned
   for(topic <- topicsForPartitionsReassignment)
   {
      partitionsCreated = ls /brokers/topics/topic
      for(partition <- partitionsCreated) 
      {
		if(/brokers/topics/topic/partition/replicas path exists)
          {
             error(“Cannot cleanup. Topic exists with live partition”)
             return false 
          }
      }
	 // partition paths can be safely deleted
      for(partition <- partitionsCreated)
         delete /brokers/topics/topic/partition
      delete /brokers/topics/topic
   } 
}

Delete topic

Code Block
deleteTopic(topic) 
{
   partitionsForTopic = ls /brokers/topics/topic
   for(partition <- partitionsForTopic) {
      if(!deletePartition(topic, partition))
      {
         error(“Failed to delete partition for topic”)
         exit 
      }
   }
   // delete topic path in ZK
   delete /brokers/topics/topic
}

Add partition to existing topic

Code Block
addPartition(topic, partition, replicas) 
{
     // write the partitions reassigned path for this create topic command
     /brokers/partitions_reassigned/topic/partition=replicas
	// start replicas for this new partition
     for(replica <- replicas)
        sendStateChange(“start-replica”, replica.brokerId, -1)
     // wait till state change request is consumed by all replicas
     if(!waitTillStateChangeRequestConsumed(partition.replicas, timeout)) 
     {
        error(“Failed to create topic partition partitionId for timeout ms”)
        exit
     }
     // create partition paths in ZK
     /brokers/topics/topic/partitionId/replicas=replicas
     delete /brokers/partitions_reassigned/topic/partitionId
}

Remove partition for existing topic

Code Block
deletePartition(topic, partition) 
{
   // empty list for partition reassignment means delete partition
   /brokers/partitions_reassigned/topic/partition=””
   // wait till replica is closed by all replicas
   if(!waitTillStateChangeRequestConsumed(partition.replicas, timeout)) 
   {
     error(“Failed to delete topic after timeout ms”)
     return false
   }
   // create partition paths in ZK
   delete /brokers/topics/topic/partitionId
   delete /brokers/partitions_reassigned/topic/partitionId
}