In a Kafka cluster, one of the brokers serves as the controller, which is responsible for managing the states of partitions and replicas and for performing administrative tasks like reassigning partitions. The following describes the states of partitions and replicas, and the kind of operations going through the controller.
PartitionStateChange:
...
NewPartition: After creation, the partition is in the NewPartition state. In this state, the partition should have replicas assigned to it, but no leader/isr yet.
- OnlinePartition: Once a leader is elected for a partition, it is in the OnlinePartition state.
OfflinePartition: If, after successful leader election, the leader for partition dies, then the partition moves to the OfflinePartition state.
...
- nothing other than marking partition state as Offline
- nothing other than marking the partition state as NonExistentPartition
ReplicaStateChange:
- NewReplica: When replicas are created during topic creation or partition reassignment. In this state, a replica can only get become follower state change request.
- OnlineReplica: Once a replica is started and part of the assigned replicas for its partition, it is in this state. In this state, it can get either become leader or become follower state change requests.
- OfflineReplica : If a replica dies, it moves to this state. This happens when the broker hosting the replica is down.
- NonExistentReplica: If a replica is deleted, it is moved to this state.
Valid state transitions are:
- send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
- add the new replica to the assigned replica list if needed
- send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
- send StopReplicaRequest to the replica (w/o deletion)
- remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and UpdateMetadata request for the partition to every live broker.
- send StopReplicaRequest to the replica (with deletion)
KafkaController Operations:
- call onNewPartitionCreation
- new partitions -> NewPartition
- all replicas of new partitions -> NewReplica
- new partitions -> OnlinePartition
- all replicas of new partitions -> OnlineReplica
- partitions w/o leader -> OfflinePartition
- partitions in OfflinePartition and NewPartition -> OnlinePartition (with OfflinePartitionLeaderSelector)
- each replica on the failed broker -> OfflineReplica
- send UpdateMetadata requests for all partitions to newly started brokers
- replicas on the newly started broker -> OnlineReplica
- partitions in OfflinePartition and NewPartition -> OnlinePartition (with OfflinePartitionLeaderSelector)
- for partitions with replicas on newly started brokers, call onPartitionReassignment to complete any outstanding partition reassignment
- update assigned replica list with OAR + NAR replicasRAR replicas
- send LeaderAndIsr request to every replica in OAR + NAR RAR (with AR as OAR + NARRAR)
- replicas in NAR RAR - OAR -> NewReplica
- wait until replicas in NAR RAR join isr
- replicas in NAR RAR -> OnlineReplica
- set AR to NAR in RAR in memory
- send LeaderAndIsr request with a potential new leader (if current leader not in NARRAR) and a new assigned replica list (using NARRAR) and same isr to every broker in RAR
- replicas in OAR - NAR RAR -> Offline (force those replicas out of isr)
- replicas in OAR - NAR RAR -> NonExistentReplica (force those replicas to be deleted)
- update assigned replica list to NAR RAR in ZK
- update the /admin/reassign_partitions path in ZK
- remove partition from re-assigned partition list in ZK
- to remove this partition
- after electing leader, the replicas and isr information changes, so resend the update metadata request end UpdateMetadata request for the partition to every broker
AR leader/isr
{1,2,3} 1/{1,2,3} (initial state)
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)
...
Note that we have to update AR in ZK with NAR last RAR last since it's the only place where we store the OAR persistently. This way, if the controller crashes before that step, we can still recover.
...
- replicaStateMachine.startup():
- initialize each replica to either OfflineReplica or OnlineReplica
- each replica -> OnlineReplica (force LeaderAndIsr request to be sent to every replica)
- partitionStateMachine.startup():
- initialize each partition to either NewPartition, OfflinePartition or OnlinePartition
- each OfflinePartition and NewPartition -> OnlinePartition (force leader election)
- resume partition reassignment, if any
- resume preferred leader election, if any
...