THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
createTopic(topic, numPartitions, replicationFactor, replicaAssignmentStr)
{
if(!cleanFailedTopicCreationAttempt(topic))
{
error(“Topic topic exists with live partitions”)
exit
}
if(replicaAssignmentStr == “”) {
// assignReplicas will always assign partitions only to online brokers
replicaAssignment = assignReplicas(topic, numPartitions, replicationFactor)
}
// create topic path in ZK
create /brokers/topics/topic
for(partition <- replicaAssignment) {
addPartition(topic, partition.id, partition.replicas)
}
// report successfully started partitions for this topic
}
waitTillStateChangeRequestConsumed(partition.replicas, timeout)
{
register watch on state change path for each replica
In the listener, use a condition variable to await(timeout). If it doesn’t fire return false, else return true
}
cleanFailedTopicCreationAttempts(topic)
{
topicsForPartitionsReassignment = ls /brokers/partitions_reassigned
for(topic <- topicsForPartitionsReassignment)
{
partitionsCreated = ls /brokers/partitions_reassigned/topic
for(partition <- partitionsCreated)
{
if(/brokers/topics/topic/partition/replicas path exists)
{
error(“Cannot cleanup. Topic exists with live partition”)
return false
}
}
// partition paths can be safely deleted
for(partition <- partitionsCreated)
{
read the /brokers/partitions_reassigned/topic/partition path
for each broker listed in the above step, sendStateChange(“close-replica”, [broker_id], -1)
delete /brokers/topics/topic/partitions/partition
delete /brokers/partitions_reassigned/topic/partition
}
}
delete /brokers/topics/topic
}
|
...