Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
createTopic(topic, numPartitions, replicationFactor, replicaAssignmentStr) 
{
   if(!cleanFailedTopicCreationAttempt(topic))
   {
      error(“Topic topic exists with live partitions”)
      exit
   }  
   if(replicaAssignmentStr == “”) {
     // assignReplicas will always assign partitions only to online brokers
     replicaAssignment = assignReplicas(topic, numPartitions, replicationFactor)
   }

   // create topic path in ZK
   create /brokers/topics/topic
   for(partition <- replicaAssignment) {
     addPartition(topic, partition.id, partition.replicas) 
   }
   // report successfully started partitions for this topic
}
waitTillStateChangeRequestConsumed(partition.replicas, timeout)
{
   register watch on state change path for each replica
   In the listener, use a condition variable to await(timeout). If it doesn’t fire return false, else return true
}

cleanFailedTopicCreationAttempts(topic) 
{
   topicsForPartitionsReassignment = ls /brokers/partitions_reassigned
   for(topic <- topicsForPartitionsReassignment)
   {
      partitionsCreated = ls /brokers/topics/topic
      for(partition <- partitionsCreated) 
      {
		          if(/brokers/topics/topic/partition/replicas path exists)
          {
             error(“Cannot cleanup. Topic exists with live partition”)
             return false 
          }
      }
	 // partition paths can be safely deleted
      for(partition <- partitionsCreated)
         delete /brokers/topics/topic/partition
   } 
   delete /brokers/topics/topic
   } 
}

Delete topic

Code Block
deleteTopic(topic) 
{
   partitionsForTopic = ls /brokers/topics/topic
   for(partition <- partitionsForTopic) {
      if(!deletePartition(topic, partition))
      {
         error(“Failed to delete partition for topic”)
         exit 
      }
   }
   // delete topic path in ZK
   delete /brokers/topics/topic
}

...