THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
createTopic(topic, numPartitions, replicationFactor, replicaAssignmentStr) { if(!cleanFailedTopicCreationAttempt(topic)) { error(“Topic topic exists with live partitions”) exit } if(replicaAssignmentStr == “”) { // assignReplicas will always assign partitions only to online brokers replicaAssignment = assignReplicas(topic, numPartitions, replicationFactor) } // create topic path in ZK create /brokers/topics/topic for(partition <- replicaAssignment) { addPartition(topic, partition.id, partition.replicas) } // report successfully started partitions for this topic } waitTillStateChangeRequestConsumed(partition.replicas, timeout) { register watch on state change path for each replica In the listener, use a condition variable to await(timeout). If it doesn’t fire return false, else return true } cleanFailedTopicCreationAttempts(topic) { topicsForPartitionsReassignment = ls /brokers/partitions_reassigned for(topic <- topicsForPartitionsReassignment) { partitionsCreated = ls /brokers/partitions_reassigned/topic cleanupFailed = false for(partition <- partitionsCreated) { if(/brokers/topics/topic/partition/replicas path exists) { delete /brokers/partitions_reassigned/topic/partition error(“Cannot cleanup. Topic exists with live partition”) returncleanupFailed false= true } } if(cleanupFailed) { if(/brokers/partitions_reassigned/topic has no children) delete /brokers/partitions_reassigned/topic return false } // partition paths can be safely deleted for(partition <- partitionsCreated) { read the /brokers/partitions_reassigned/topic/partition path for each broker listed in the above step, sendStateChange(“close-replica”, [broker_id], -1) delete /brokers/topics/topic/partitions/partition delete /brokers/partitions_reassigned/topic/partition } } delete /brokers/topics/topic } |
...