Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
createTopic(topic, numPartitions, replicationFactor, replicaAssignmentStr) 
{
   if(!cleanFailedTopicCreationAttempt(topic))
   {
      error(“Topic topic exists with live partitions”)
      exit
   }  
   if(replicaAssignmentStr == “”) {
     // assignReplicas will always assign partitions only to online brokers
     replicaAssignment = assignReplicas(topic, numPartitions, replicationFactor)
   }

   // create topic path in ZK
   create /brokers/topics/topic
   for(partition <- replicaAssignment) {
     addPartition(topic, partition.id, partition.replicas) 
   }
   // report successfully started partitions for this topic
}
waitTillStateChangeRequestConsumed(partition.replicas, timeout)
{
   register watch on state change path for each replica
   In the listener, use a condition variable to await(timeout). If it doesn’t fire return false, else return true
}

cleanFailedTopicCreationAttempts(topic) 
{
   topicsForPartitionsReassignment = ls /brokers/partitions_reassigned
   for(topic <- topicsForPartitionsReassignment)
   {
      partitionsCreated = ls /brokers/partitions_reassigned/topic
      cleanupFailed = false
      for(partition <- partitionsCreated) 
      {
          if(/brokers/topics/topic/partition/replicas path exists)
          {
             delete /brokers/partitions_reassigned/topic/partition
             error(“Cannot cleanup. Topic exists with live partition”) 
             returncleanupFailed false= true
          }
      }
	      if(cleanupFailed) {
        if(/brokers/partitions_reassigned/topic has no children)
           delete /brokers/partitions_reassigned/topic
        return false
      }
      // partition paths can be safely deleted
      for(partition <- partitionsCreated)
      {
         read the /brokers/partitions_reassigned/topic/partition path 
         for each broker listed in the above step, sendStateChange(“close-replica”, [broker_id], -1)
         delete /brokers/topics/topic/partitions/partition
         delete /brokers/partitions_reassigned/topic/partition
      }
   } 
   delete /brokers/topics/topic
}

...