THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
Motivation
Summary of existing controller
Current Kafka controller is a multi-threaded controller that emulates a state machine. It works in the following way.
Maintained state:
- Partitions replicas Replicas of partitions on each machine.
- Leaders of partitions.
...
Code Block | ||
---|---|---|
| ||
while(!shutdown){
Event event = eventQueue.pollFirst()
// Make state change
try {
val brokerRequests = event.makeStateChange(partitionStateMachine)
brokerRequests.map { case (broker, request) =>
networkClient.send(request)
event.unAckedNode.add(broker)
}
while (!event.unAckedNode.isEmpty) {
try {
networkClient.poll(timeout)
} catch {
case KafkaApiException =>
// Do something
case Exception =>
// Error handling
}
checkNodeLivenessAndIgnoreDeadNode()
}
} catch {
case StateChangeException =>
// handle illegal state change
}
} |
...