Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagescala
while(!shutdown){
	Event event = eventQueue.pollFirst()
	// Make state change
	try {
		val brokerRequests = event.makeStateChange(partitionStateMachine)
   		brokerRequests.map { case (broker, request) =>
    		networkClient.send(request)
        	event.unAckedNode.add(broker)
    	}
		while (!event.unAckedNode.isEmpty) {
			try {
				networkClient.poll(timeout)
			} catch {
				case KafkaApiException =>
					// Do something
				case Exception =>
					// Error handling
			}
			checkNodeLivenessAndIgnoreDeadNode()
		}
	} catch {
		case StateChangeException =>
			// handle illegal state change
	}
}

...