...
- Add a ReassignedPartitionsContext for each reassignment in
/admin/reassignments
- Add ReassignedPartitionsContexts for each child of
/admin/reassignment_requests
, but only if the context being added has the same legacy flag as any current context for the partition. - Add ReassignedPartitionsContexts for partition in
/admin/reassign_partitions
, but only if the context being added has the same legacy flag as any current context for the partition.
Change to the KafkaController.onPartitionReassignment()
algorithm
The existing algorithm in onPartitionReassignment()
is suboptimal for changing reassignments in-flight. As an example, consider an initial assignment of [1,2], reassigned to [2,3] and then changed to [2,4]. With the current algorithm broker 3 will remain in the assigned replicas until broker 4 is in sync, even though 3 wasn't one of the original assigned replicas nor one of the new assigned replicas after the second reassignment. Broker 3 will try to get and stay in-sync during this time, wasting network bandwidth and CPU on both broker 3 and the leader.
More complex scenarios exist where such "intermediate replicas" become the leader.
To solve this problem it is necessary to compute a set of replicas to be removed from the assigned replicas when a reassignment changes.
The set drop of replicas to be removed will be computed as follows:
At the point a new reassignment is created we compute the value
top = len(original_assignment)
and store this in ZK. We also checktop >= min.insync.replicas
, logging a warning and ignore the reassignment if the check fails.- Then, initially, and each time the reassignment changes, (i.e. in
KafkaController.onPartitionReassignment()
):- Define a sorted list of out-of-sync replicas:
osr = sort(assigned -- isr, by_lag_ascending)
- Create a list of current replicas
existing = List(leader) ++ isr ++ osr
Remove duplicates from
existing
(keeping the first occurrence in the list)- Then the set of replicas which can be dropped is:
drop = assigned --
existing[0:top-1]
- Define a sorted list of out-of-sync replicas:
Notes:
- Since
top > 0
it is impossible to drop the leader, and thus it's impossible that a leader election will be needed at this point. By the assumption
top >= min.insync.replicas
it's impossible to shrink the ISR to a point where producers are blocked, assuming the ISR was already sufficifient.
In order to actually drop those replicas:
Make the transitions in the RSM:
-> Offline -> ReplicaDeletionStarted -> ReplicaDeletionSuccessful -> NonExistentReplica
Update the assigned replicas in memory and in ZK
Compatibility, Deprecation, and Migration Plan
...