Motivation
Summary of existing controller
Current Kafka controller is a multi-threaded controller that emulates a state machine. It works in the following way.
Maintained state:
- Partitions replicas Replicas of partitions on each machine.
- Leaders of partitions.
...
- State change are executed by different listeners concurrently. Hence complicated synchronization is needed which is error prone and difficult to debug.
- State propagation is not synchronized. Brokers might be in different state for undetermined time. This leads to unnecessary extra data loss.
- During controlled shutdown process, two connections are used for controller to broker communication. This makes state change propagation and controlled shutdown approval out of order.
Some of the state changes are complicated because the ReplicaStateMachine and PartitionStateMachine are separate but the state changes themselves are not. So the controller has to maintain some sort of ordering between the state changes executed by the individual state machines. In the new design, we will look into folding them into a single one.
Many state changes are executed for one partition after another. It would be much more efficient to do it in one request.
Some long running operations need to support cancellation. For example, we want to support cancellation of partition reassignment for those partitions whose reassignment hasn't started yet. The new controller should be able to abort/cancel the long running process.
New controller design
Outline
...
There will be eight types of events in the controller, which are defined as below:. Each listener will generate one type of event, controlled shutdown is the eighth event type.
Code Block | ||
---|---|---|
| ||
object ControllerEventType extends Enumeration { type ControllerEventType = Value val AddPartition, TopicChange, DeleteTopic, BrokerChange, PreferredReplicaElection, PartitionReassigned, ReassignedPartitionIsrChange, ControlledShutdown = Value } |
...
Code Block | ||
---|---|---|
| ||
while(!shutdown){ Event event = eventQueue.pollFirst() // Make state change try { val brokerRequests = event.makeStateChange(partitionStateMachine) brokerRequests.map { case (broker, request) => networkClient.send(request) event.unAckedNode.add(broker) } while (!event.unAckedNode.isEmpty) { try { networkClient.poll(timeout) } catch { case KafkaApiException => // Do something case Exception => // Error handling } checkNodeLivenessAndIgnoreDeadNode() } } catch { case StateChangeException => // handle illegal state change } } |
Long Running Process Handling
Currently replica reassigment is a really long running process. In new controller, we want to support abort/cancellation of the long running process when some event that affects the partition reassignment occurs after the long running process starts. In new controller, we will let later events overrides the state change of previous events based on the principle of avoiding under replicated partition (URP).
For example, partition reassignment consists of several events:
1 PartitionReassignedEvent + N * ReassignedPartitionIsrChangeEvent.
Time | Broker 0 | Broker 1 | Broker 2 | Event |
---|---|---|---|---|
0 - Initial State | {t1p1, replica:{0,1}, leader:0, isr:{0,1}} | {t1p1, replica:{0,1}, leader:0, isr:{0,1}} | ||
1 - Reassign t1p1 to broker 1 and broker 2 | PartitionReassignedEvent | |||
{t1p1, replica:{0,1,2}, leader:1, isr:{0,1}} | {t1p1, replica:{0,1,2}, leader:1, isr:{0,1}} | {t1p1, replica:{0,1,2}, leader:1, isr:{0,1}} | ||
2.A - Broker 2 down (Scenario 1) | BrokerChangeEvent | |||
| {t1p1, replica:{0,1,2}, leader:0, isr:{0,1}} | {t1p1, replica:{0,1,2}, leader:0, isr:{0,1}} | ||
2.B - Broker 1 down (Scenario 2) | {t1p1, replica:{0,1,2}, leader:0, isr:{0,1}} | |||
3 - Another partition reassignment | PartitionReassignedEvent |
In scenario 2.A, we may choose to remove broker 2 from replica list or we can just stop listening to ISR change and let a later partition reassignment take care of this.
In scenario 2.B, we are better off to keep broker 2 in the replica list, and not remove broker 0 from isr after broker 2 enters isr so we do not expose to under replicated partition.
In step 3, when another partition reassignment comes, if the reassigned topic partition in step 1 and step 3 are the same, the second partition assignment will overrides the first one. It should:
- clean up the listeners of the previous partition reassignment on the overridden partitions,
- start the normal partition reassignment process.
Discussion Required
- As stated in KAFKA-2029, current state change propagation has an issue that in an unbalanced cluster. State change on a heavily loaded broker will be much slower than a lightly loaded broker. This is because of the following two reasons:
- Controller traffic is not prioritized on broker side. So controller messages needs to wait until some clients requests are handled which takes much longer on a heavily loaded broker.
- Heavily loaded broker needs to take state change for more partitions while same amount state changes are distributed among several brokers as followers.
- Ideally, we should try to let the brokers in consistent state if possible. That indicates that we should put synchronization barriers between two events. Which means we do not execute the next event until:
- Callback has been fired on each broker, or
- A broker is down and we will just skip waiting for its callback and move on.
- We actually have two ways to determine if a broker is alive or not: from Zookeeper or from NetworkClient. Currently we take zookeeper as source of truth, but when we send messages, NetworkClient connectivity is the one actually matters. Because Zookeeper timeout could take a while, so what should we do if Zookeeper says a broker is alive but NetworkClient shows it is disconnected. Should we block the current event processing? If there is a long GC, is it possible that NetworkClient says broker is connected but Zookeeper says it's dead? Back to question 2, it is about when we consider "a broker is down".
...