Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: An algorithm for removing "intermediate replicas"

...

  1. Add a ReassignedPartitionsContext for each reassignment in /admin/reassignments
  2. Add ReassignedPartitionsContexts for each child of /admin/reassignment_requests, but only if the context being added has the same legacy flag as any current context for the partition.
  3. Add ReassignedPartitionsContexts for partition in /admin/reassign_partitions, but only if the context being added has the same legacy flag as any current context for the partition.

Change to the KafkaController.onPartitionReassignment() algorithm

The existing algorithm in onPartitionReassignment() is suboptimal for changing reassignments in-flight. As an example, consider an initial assignment of [1,2], reassigned to [2,3] and then changed to [2,4]. With the current algorithm broker 3 will remain in the assigned replicas until broker 4 is in sync, even though 3 wasn't one of the original assigned replicas nor one of the new assigned replicas after the second reassignment. Broker 3 will try to get and stay in-sync during this time, wasting network bandwidth and CPU on both broker 3 and the leader.

More complex scenarios exist where such "intermediate replicas" become the leader.

To solve this problem it is necessary to compute a set of replicas to be removed from the assigned replicas when a reassignment changes.

The set drop of replicas to be removed will be computed as follows:

  1. At the point a new reassignment is created we compute the value top = len(original_assignment) and store this in ZK. We also check top >= min.insync.replicas, logging a warning and ignore the reassignment if the check fails.

  2. Then, initially, and each time the reassignment changes, (i.e. in KafkaController.onPartitionReassignment()):
    1. Define a sorted list of out-of-sync replicas: osr = sort(assigned -- isr, by_lag_ascending)
    2. Create a list of current replicas existing = List(leader) ++ isr ++ osr
    3. Remove duplicates from existing (keeping the first occurrence in the list)

    4. Then the set of replicas which can be dropped is: drop = assigned -- existing[0:top-1]

Notes:

  • Since top > 0 it is impossible to drop the leader, and thus it's impossible that a leader election will be needed at this point.
  • By the assumption top >= min.insync.replicas it's impossible to shrink the ISR to a point where producers are blocked, assuming the ISR was already sufficifient.

 In order to actually drop those replicas:

  1. Make the transitions in the RSM: -> Offline -> ReplicaDeletionStarted -> ReplicaDeletionSuccessful -> NonExistentReplica

  2. Update the assigned replicas in memory and in ZK

Compatibility, Deprecation, and Migration Plan

...