Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note that  /admin/reassign_cancel will not be deleted after the cancellation is completed.    AdminClient  CLI  options will be added to  submit  reassignment cancellation,  and  remove the cancellation.  e..g.   The proposed  2 new options:  --cancel --remove-cancel

No Format
georgeli@kafka679kafka679-sjc1:~$ zkcli -h kafkazk80-sjc1 ls /kafka-sjc1-secpoc/admin/
[u'delete_topics']


georgeli@kafka679kafka679-sjc1:~$ /usr/lib/kafka/bin/kafka-reassign-partitions.sh  --zookeeper kafkazk80-sjc1/kafka-sjc1-secpoc --cancel
Rolling back the current pending reassignments Map(test_topic-25 -> Map(replicas -> Buffer(832, 679, 680), original_replicas -> Buffer(832, 679, 681)))
Successfully submitted cancellation of reassignments.

georgeli@kafka679kafka679-sjc1:~$ zkcli -h kafkazk80-sjc1 ls /kafka-sjc1-secpoc/admin/
[u'reassign_cancel',
 u'delete_topics']


georgeli@kafka679kafka679-sjc1:~ $ /usr/lib/kafka/bin/kafka-reassign-partitions.sh  --zookeeper kafkazk80-sjc1/kafka-sjc1-secpoc --remove-cancel 
Successfully remove the znode /admin/reassign_cancel.


georgeli@kafka679kafka679-sjc1:~$ zkcli -h kafkazk80-sjc1 ls /kafka-sjc1-secpoc/admin/
[u'delete_topics']

...

While znode /admin/reassign_cancel is present,  no new reassignments can be submitted.  e.g.  enforce before writing to /admin/reassign_partitions  and at  onPartitionReassignment()  with : 

Code Block
language
No Format
js
private def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
  if (zkClient.reassignCancelInPlace()) return // if the ReassignCancelZNode exists , skip reassignment()) return // if the ReassignCancelZNode exists , skip reassignment


In order to support submit extra reassignments while existing reassignments are still in-flight.    An extra  znode  /admin/reassign_partitions_extra  which has the same JSON format as /admin/reassign_partitions.   Three extra options --generate-extra  --verify-extra  --execute-extra  are added to  kafka-reassign-partitions.sh .   The controller will be informed of the extra reassignments via a ZooKeeper watch.   It will get all topic/partitions from /admin/reassign_partitions_extra  and  add to /admin/reassign_partitions,  then trigger the reassignments onPartitionReassignment()  of the  topic/partitions.   

In case inside the /admin/reassign_partitions_extra,  there are topic/partitions which exist in /admin/reassign_partitions pending reassignments,  the conflict resolution for those duplicate  topic/partitions is to first  cancel / rollback the reassignments of those topic/partitions in /admin/reassign_partitions,  then submit new reassignments from /admin/reassign_partitions_extra.   This approach will be simpler than the algorithm proposed by Tom previously to infer the final replicas for those duplicate  topic/partitions.   After the topic/partition is put in /admin/reassign_partitions,  it will be removed from /admin/reassign_partitions_extra,  and when /admin/reassign_partitions_extra is empty,  the znode will be deleted. 


Instead a protocol similar to that used for config changes will be used. To start a reassignment (of a single partition, or a collection of partitions) a persistent sequenced znode will be created in /admin/reassignment_requests. The controller will be informed of this new request via a ZooKeeper watch. It will initiate the reassignment, create a znode /admin/reassignments/$topic/$partition and then delete the /admin/reassignment_requests/request_NNNN znode.

A client can subsequently create more requests involving the same partitions (by creating a /admin/reassignment_requests/request_MMMM znode), and the controller would re-initiate reassignment as the assignment changes. This means it is possible to "cancel" the reassignment of a single partition by changing the reassignment to the old assigned replicas. Note, however, that the controller itself doesn't remember the old assigned replicas – it is up to the client to record the old state prior to starting a reassignment if that client needs to support cancellation.

When the ISR includes all the newly assigned replicas (given as the contents of the reassignment znode), the controller would remove the relevant child of /admin/reassignments (i.e. at the same time the controller currently updates/removes the /admin/reassign_partitions znode).

In this way the child nodes of /admin/reassignments are correspond precisely to the replicas currently being reassigned. Moreover the ZooKeeper czxid of a child node of /admin/reassignments identifies a reassignment uniquely. It is then possible to determine whether that exact reassignment is still on-going, or (by comparing the czxid with the mzxid) has been changed.

The following sections describe, in detail, the algorithms used to mange the various znodes

The supporting the legacy znode protocol (/admin/reassign_partitions)

  1. Some ZK client creates /admin/reassign_partitions with the same JSON content as currently
  2. ZK triggers watch on /admin/reassign_partitions, controller notified
  3. Controller creates /admin/reassignment_requests/request_xxx (for xxx the znode sequence number)

The JSON in step 3 would look like:

No Format
{
    "version":1,
    "assignments":{
        "my-topic": {"0":[1,2,3]}
       } "your-topic": {"3":[4,5,6]}
    }
    "legacy":true
 }

The "legacy" key is used to track that this request originates from /admin/reassign_partitions.

The new znode protocol

  1. Some ZK client creates /admin/reassignment_requests/request_xxx (without "legacy" key mentioned above, unless it was created via the legacy protocol described above)
  2. ZK triggers watch on /admin/reassignment_requests/request_xxx, controller notified

  3. For each /admin/reassignment_requests/request_xxx the controller:

    1. Reads the znode to find the partitions being reassigned and the newly assigned brokers.

    2. Checks whether a reassignment for the same partition exists:
      1. If it does, and one is a legacy request and the other is not, then it logs a warning that the request is being ignored.
      2. Otherwise:

        1. The controller initiates reassignment

        2. The controller creates or updates  /admin/reassignments/$topic/$partition corresponding to those partitions
    3. The controller deletes /admin/reassignment_requests/request_xxx

The check in 3.b.i prevents the same partition being reassigned via different routes.

Revised protocol when a partition "catches up

  1. If the reassignment was via the legacy path:
    1. The controller removes partition from /admin/reassign_partitions, or if this is the last partition being reassigned by a change to /admin/reassign_partitions, then /admin/reassign_partitions is deleted.
  2. The controller removes /admin/reassignments/$topic/$partition

On controller failover

When failing over, the new controller needs to cope with three cases:

  1. Creation of /admin/reassign_partitions while there was no active controller
  2. Creation of childen of /admin/reassignment_requests while there was no active controller
  3. Restarting reassignment of all the in-flight reassignments in /admin/reassignments

The controller context's partitionsBeingReassigned map can be updated as follows:

  1. Add a ReassignedPartitionsContext for each reassignment in /admin/reassignments
  2. Add ReassignedPartitionsContexts for each child of /admin/reassignment_requests, but only if the context being added has the same legacy flag as any current context for the partition.
  3. Add ReassignedPartitionsContexts for partition in /admin/reassign_partitions, but only if the context being added has the same legacy flag as any current context for the partition.

Change to the KafkaController.onPartitionReassignment() algorithm

The existing algorithm in onPartitionReassignment() is suboptimal for changing reassignments in-flight. As an example, consider an initial assignment of [1,2], reassigned to [2,3] and then changed to [2,4]. With the current algorithm broker 3 will remain in the assigned replicas until broker 4 is in sync, even though 3 wasn't one of the original assigned replicas nor one of the new assigned replicas after the second reassignment. Broker 3 will try to get and stay in-sync during this time, wasting network bandwidth and CPU on both broker 3 and the leader.

More complex scenarios exist where such "intermediate replicas" become the leader.

To solve this problem it is necessary to compute a set of replicas to be removed from the assigned replicas when a reassignment changes.

The set drop of replicas to be removed will be computed as follows:

  1. At the point a new reassignment is created we compute the value top = len(original_assignment)

    1. We check top >= min.insync.replicas, logging a warning and ignore the reassignment if the check fails.

    2. Otherwise we store this in the /admin/reassignments/$topic/$partition znode.
  2. Then, initially, and each time the reassignment changes, (i.e. in KafkaController.onPartitionReassignment()):
    1. Let assigned be the currently assigned replicas, ie. assigned = ControllerContext.partitionRepliacAssignment(tp)
    2. Define a sorted list of out-of-sync replicas: osr = assigned -- isr
    3. Create a list of current replicas existing = List(leader) ++ isr ++ osr
    4. Remove duplicates from existing (keeping the first occurrence in the list)

    5. Then, the set of replicas which can be dropped is: drop = assigned -- existing[0:top-1]

Notes:

  • Since top > 0 it is impossible to drop the leader, and thus it's impossible that a leader election will be needed at this point.
  • By the assumption top >= min.insync.replicas it's impossible to shrink the ISR to a point where producers are blocked, assuming the ISR was already sufficifient.

  • If the controller knew how far behind the ISR each of the OSR replicas was, OSR could be sorted, which would result in dropping the furthest-behind replicas. But the controller doesn't know this.

 In order to actually drop those replicas:

  1. Make the transitions in the RSM: -> Offline -> ReplicaDeletionStarted -> ReplicaDeletionSuccessful -> NonExistentReplica

  2. Update the assigned replicas in memory and in ZK

Compatibility, Deprecation, and Migration Plan

As described above, compatibility with /admin/reassign_partitions is maintained, so existing software will continue working and the only difference to a client that operates on /admin/reassign_partitions would observe would be a slight increase in latency due to the round trips needed to create the new znodes.znode (/admin/reassign_partitions_extraand possible conflict resolution.  The newly introduced znode /admin/reassign_cancel is used solely for canceling/rollback of current reassignments still inflight in /admin/reassign_partitions

This compatibility behaviour behavior could be dropped in some future version of Kafka, if that was desirable.

...