Current Kafka controller is a multi-threaded controller that emulates a state machine. It works in the following way.
Some of the state changes are complicated because the ReplicaStateMachine and PartitionStateMachine are separate but the state changes themselves are not. So the controller has to maintain some sort of ordering between the state changes executed by the individual state machines. In the new design, we will look into folding them into a single one.
Many state changes are executed for one partition after another. It would be much more efficient to do it in one request.
Some long running operations need to support cancellation. For example, we want to support cancellation of partition reassignment for those partitions whose reassignment hasn't started yet. The new controller should be able to abort/cancel the long running process.
We will keep maintained state and fail over/back model unchanged.
We are going to change the state change propagation model, state change execution and output of the state change input source. More specifically:
We would also like to
KAFKA-2139, KAFKA-2029, KAFKA-1305 (and definitely some other tickets... Appreciate it if people can add it here.)
There will be eight types of events in the controller, which are defined as below. Each listener will generate one type of event, controlled shutdown is the eighth event type.
object ControllerEventType extends Enumeration { type ControllerEventType = Value val AddPartition, TopicChange, DeleteTopic, BrokerChange, PreferredReplicaElection, PartitionReassigned, ReassignedPartitionIsrChange, ControlledShutdown = Value } |
A generic controller event class will be defined:
abstract class KafkaControllerEvent(eventType: ControllerEventType) { // A set that tracks the responses from brokers val unAckedNode = new mutable.HashSet[Int] val eventDone = new CountDownLatch(1) def makeStatesChange(currentState: PartitionStateMachine): Map[Int, ClientRequest] def controllerRequestCallback(response: ClientResponse) { handleBrokerResponse(response) unAckedNode.remove(response.request().request().destination()) if (unAckedNode.isEmpty) eventDone.countDown() } def handleBrokerResponse(response: ClientResponse) } |
class AddPartitionEvent extends KafkaControllerEvent(ControllerEventType.AddPartition) { override def makeStatesChange(): Map[Int, ClientRequest] = { // make state change and generate requests to each broker } override def handleBrokerResponse(response: ClientResponse) { // If necessary, do something when response is received } } |
class TopicChangeEvent extends KafkaControllerEvent(ControllerEventType.TopicChange) { override def makeStatesChange(): Map[Int, ClientRequest] = { // make state change and generate requests to each broker } override def handleBrokerResponse(response: ClientResponse) { // If necessary, do something when response is received } } |
class DeleteTopicEvent extends KafkaControllerEvent(ControllerEventType.DeleteTopic) { override def makeStatesChange(): Map[Int, ClientRequest] = { // make state change and generate requests to each broker } override def handleBrokerResponse(response: ClientResponse) { // If necessary, do something when response is received } } |
class BrokerChangeEvent extends KafkaControllerEvent(ControllerEventType.BrokerChange) { override def makeStatesChange(): Map[Int, ClientRequest] = { // make state change and generate requests to each broker } override def handleBrokerResponse(response: ClientResponse) { // If necessary, do something when response is received } } |
class PreferredReplicaElectionEvent extends KafkaControllerEvent(ControllerEventType.PreferredReplicaElection) { override def makeStatesChange(): Map[Int, ClientRequest] = { // make state change and generate requests to each broker } override def handleBrokerResponse(response: ClientResponse) { // If necessary, do something when response is received } } |
class PartitionReassignedEvent extends KafkaControllerEvent(ControllerEventType.PartitionReassigned) { override def makeStatesChange(): Map[Int, ClientRequest] = { // make state change and generate requests to each broker } override def handleBrokerResponse(response: ClientResponse) { // If necessary, do something when response is received } } |
class ReassignedPartitionIsrChangeEvent extends KafkaControllerEvent(ControllerEventType.ReassignedPartitionIsrChange) { override def makeStatesChange(): Map[Int, ClientRequest] = { // make state change and generate requests to each broker } override def handleBrokerResponse(response: ClientResponse) { // If necessary, do something when response is received } } |
class ControlledShutdownEvent extends KafkaControllerEvent(ControllerEventType.ControlledShutdown) { override def makeStatesChange(): Map[Int, ClientRequest] = { // make state change and generate requests to each broker } override def handleBrokerResponse(response: ClientResponse) { // If necessary, do something when response is received } } |
The general event handling process would be something like this:
while(!shutdown){ Event event = eventQueue.pollFirst() // Make state change try { val brokerRequests = event.makeStateChange(partitionStateMachine) brokerRequests.map { case (broker, request) => networkClient.send(request) event.unAckedNode.add(broker) } while (!event.unAckedNode.isEmpty) { try { networkClient.poll(timeout) } catch { case KafkaApiException => // Do something case Exception => // Error handling } checkNodeLivenessAndIgnoreDeadNode() } } catch { case StateChangeException => // handle illegal state change } } |
Currently replica reassigment is a really long running process. In new controller, we want to support abort/cancellation of the long running process when some event that affects the partition reassignment occurs after the long running process starts. In new controller, we will let later events overrides the state change of previous events based on the principle of avoiding under replicated partition (URP).
For example, partition reassignment consists of several events:
1 PartitionReassignedEvent + N * ReassignedPartitionIsrChangeEvent.
Time | Broker 0 | Broker 1 | Broker 2 | Event |
---|---|---|---|---|
0 - Initial State | {t1p1, replica:{0,1}, leader:0, isr:{0,1}} | {t1p1, replica:{0,1}, leader:0, isr:{0,1}} | ||
1 - Reassign t1p1 to broker 1 and broker 2 | PartitionReassignedEvent | |||
{t1p1, replica:{0,1,2}, leader:1, isr:{0,1}} | {t1p1, replica:{0,1,2}, leader:1, isr:{0,1}} | {t1p1, replica:{0,1,2}, leader:1, isr:{0,1}} | ||
2.A - Broker 2 down (Scenario 1) | BrokerChangeEvent | |||
| {t1p1, replica:{0,1,2}, leader:0, isr:{0,1}} | {t1p1, replica:{0,1,2}, leader:0, isr:{0,1}} | ||
2.B - Broker 1 down (Scenario 2) | {t1p1, replica:{0,1,2}, leader:0, isr:{0,1}} | |||
3 - Another partition reassignment | PartitionReassignedEvent |
In scenario 2.A, we may choose to remove broker 2 from replica list or we can just stop listening to ISR change and let a later partition reassignment take care of this.
In scenario 2.B, we are better off to keep broker 2 in the replica list, and not remove broker 0 from isr after broker 2 enters isr so we do not expose to under replicated partition.
In step 3, when another partition reassignment comes, if the reassigned topic partition in step 1 and step 3 are the same, the second partition assignment will overrides the first one. It should: