...
To solve this problem it is necessary to compute a set of replicas to be removed from the assigned replicas when a reassignment changes.
The set set drop
of replicas to be removed will be computed as follows:
At the point a new reassignment is created we compute the value
top = len(original_assignment)
and store this in ZK. We alsoWe check
top >= min.insync.replicas
, logging a warning and ignore the reassignment if the check fails.- Otherwise we store this in the
/admin/reassignments/$topic/$partition
znode.
- Then, initially, and each time the reassignment changes, (i.e. in
KafkaController.onPartitionReassignment()
):- Let
assigned
be the currently assigned replicas, ie.assigned = ControllerContext.partitionRepliacAssignment(tp)
- Define a sorted list of out-of-sync replicas:
osr = assigned -- isr
- Create a list of current replicas
existing = List(leader) ++ isr ++ osr
Remove duplicates from
existing
(keeping the first occurrence in the list)- Then, the set of replicas which can be dropped is:
drop = assigned --
existing[0:top-1]
- Let
...