Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Strictly speaking this is not a change that would affect any public interfaces (since ZooKeeper is not considered a public interface, and it can be made in a backward compatible way), however since some users are known to operate on the /admin/reassign_partitions znode directly,  this could break in future versions of Kafka  (e.g. as reported in KAFKA-7854).   ,  and such operations should be discouraged.  


A new znode /admin/cancel_reassignment_in_progress is used to signal the Controller to cancel  current pending reassignments  in  /admin/reassign_partitions

For the existing /admin/reassign_partitions znode,  adding "original_replicas" to support rollback to its original state of the topic partition assigned replicas.   How "original_replicas" gets populated will be discussed in detail later. 

Code Block
languagejs
{"version":1,
 "partitions":[{"topic": "foo1",
                "partition": 0,
		        "replicas": [1,2,4],
                "original_replicas": [1,2,3]
               },
               {"topic": "foo2",
                "partition": 1,
		        "replicas": [7,9,8],
                "original_replicas": [5,6,8]
               }]            
}

Note that we can only cancel the pending reassignments of current batch of reassignments,  some reassignments can complete almost instantly if the replicas set is not changed (already in ISR), only the ordering is changed.  e.g.   (1,2,3) => (2,3,1), the preferred leadership is changed.  To rollback all the reassignments in current batch (not just the pending reassignments, including those already completed in the same batch),  the client who submitted the reassignment should keep a "rollback" version and submit as reassignment after /admin/reassign_partitions is empty and deleted


For the user client submitting new reassignment JSON file format,  the public interface will remain the same.   The user client will submit  list of topic/partition → replicas (new replicas assignments).   Before writing to the znode /admin/reassign_partitions,  the controller will be adding "original_replicas" to support rollback to its original state of the topic partition assigned replicas.   How "original_replicas" gets populated will be discussed in detail later.   

Proposed Changes

Reassignment Cancellation

The main idea is support clean, safe cancellation of pending reassignments in /admin/reassign_partitions znode in a timely fashion, and support more reassignments while currently some reassignments are  in-flight. 

When client are submitting reassignments,  it only needs to submit  "replicas" (new replicas assignment) of the  topic / partition.  Before writing to   /admin/reassign_partitions, the current assigned replicas (original replicas) are read from Zookeeper and added the  "original_replicas"  for that topic/partition reassignments .  This  "original_replicas" will be used for rollback of the topic/partition replicas assignment during cancellation. 

e.g.  after the controller reads the reassignment JSON submitting by the AdminClient, the following will be written to /admin/reassign_partitionsFor submitting new reassignments while some are still pending,   the new /admin/reassign_partitions_queue znode is added,  The JSON format is the same as /admin/reassign_partitions  (without "original_replicas" added above). e.g. :

Code Block
languagejs
{"version":1,
 "partitions":[{"topic": "foo1",
                "partition": 0,
		        "replicas": [1,2,54],
               },
    "original_replicas": [1,2,3]
               },
               {"topic": "foo2",
                "partition": 1,
		        "replicas": [7,9,10,8],
                "original_replicas": [5,6,8]
               }]            
}

Proposed Changes

Reassignment Cancellation

The main idea is support clean, safe cancellation of pending reassignments in /admin/reassign_partitions znode in a timely fashion, and support more reassignments while currently some reassignments are  in-flight. 

...


For ControllerContext.partitionBeingReassigned also add the originalReplicas to the ReassignedPartitionsContext class besides  newReplicas :

Code Block
languagejs
case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
                                       var originalReplicas: Seq[Int]= Seq.empty,
                                       val reassignIsrChangeHandler: PartitionReassignmentIsrChangeHandler) {


To trigger the reassignment cancellation,  a new znode /admin/cancel_reassignment_in_progress  is created,  the controller will be informed of the reassignment cancellation via a ZooKeeper watch on this.   The controller will read the current pending reassignments in /admin/reassign_partitions and re-populate  ControllerContext.partitionsBeingReassigned.   For each pending topic/partition reassignments, the cancellation /rollback works like below, it's like the opposite of doing reassignments,  since we have the  "original_replicas" of each topic/partition reassignments in /admin/reassign_partitions  & ControllerContext.partitionBeingReassigned,  it is much easier to rollback. 

...

No Format
$ zkcli -h kafka-zk-host1 ls /kafka-cluster/admin/
[u'reassign_partitions',
 u'delete_topics']

# Current pending reassignment(s)
$ zkcli -h kafka-zk-host1 get /kafka-cluster/admin/reassign_partitions
('{"version":1,"partitions":[{"topic":"test_topic","partition":25,"replicas":[1,2,4],"original_replicas":[1,2,3]}]}', ZnodeStat(czxid=17180484637, mzxid=17180484641, ctime=1549498790668, mtime=1549498790680, version=1, cversion=0, aversion=0, ephemeralOwner=0, dataLength=148, numChildren=0, pzxid=17180484637))

$ /usr/lib/kafka/bin/kafka-reassign-partitions.sh  --zookeeper kafka-zk-host1/kafka-cluster --cancel
Rolling back the current pending reassignments Map(test_topic-25 -> Map(replicas -> Buffer(1, 2, 4), original_replicas -> Buffer(1, 2, 3)))
Successfully submitted cancellation of reassignments.

# This is just for illustration purpose.  In reality, the cancellation of reassignments should be pretty quick. 
# The below listing of /admin might not even show cancel_reassignment_in_progress & reassign_partitions
$ zkcli -h kafka-zk-host1 ls /kafka-cluster/admin/
[u'cancel_reassignment_in_progress',
 u'reassign_partitions',
 u'delete_topics']

# After reassignment cancellation is complete.  The ZK node  /admin/cancel_reassignment_in_progress  & /admin/reassign_partitions are gone.
$ zkcli -h kafka-zk-host1 ls /kafka-cluster/admin/
[u'delete_topics']


Planned Future Changes 

New reassignments while existing reassignments in-flight  

In order to support submitting more reassignments while The above Reassignment Cancellation is more straight forward.   However,  to submit new reassignments while there are existing reassignments are still in-flight, it needs a bit more discussions and consensus.  It might be worth doing it in another KIP.  So it's listed as  Planned Future Changes,  if consensus can be reached on this design,  this feature can be delivered in this KIP as well.  

In order to support submitting more reassignments while existing reassignments are still in-flight.  An extra znode /admin/reassign_partitions_queue  which has the same JSON format as /admin/reassign_partitions.   Three more options --generate-queue  --verify-queue  --execute-queue  will be added .  An extra znode /admin/reassign_partitions_queue  which has the same JSON format as /admin/reassign_partitions.   Three more options --generate-queue  --verify-queue  --execute-queue  will be added to  kafka-reassign-partitions.sh. The controller will be informed of the queued reassignments via a ZooKeeper watch.   It will get all topic/partitions from will be informed of the queued reassignments via a ZooKeeper watch.   It will get all topic/partitions from /admin/reassign_partitions_queue  and  add to /admin/reassign_partitions,  then trigger the reassignments onPartitionReassignment()  of the  topic/partitions.   


The new /admin/reassign_partitions_queue znode JSON format is the same as /admin/reassign_partitions . e.g. :

Code Block
languagejs
{"version":1,
 "partitions":[{"topic": "foo1",
                "partition": 0,
		        "replicas": [1,2,5]
               },
               {"topic": "foo2",
                "partition": 1,
		        "replicas": [7,9,10]
               }]            
}


If  /admin/reassign_partitions_queue  and  add  znode already exists,  new queued reassignments will be blocked from writing to /admin/reassign_partitions,  then trigger the reassignments onPartitionReassignment()  of the  topic/partitions.  _queue. 


In case inside the /admin/reassign_partitions_queue,  there are topic/partitions which exist in /admin/reassign_partitions (pending reassignments),  the conflict resolution for those duplicate  topic/partitions is to first  cancel / rollback the pending reassignments of those topic/partitions in /admin/reassign_partitions,  then submit new reassignments from /admin/reassign_partitions_queue to /admin/reassign_partitions.   This approach will be simpler than the algorithm proposed by by Tom Bentley previously to infer the final replicas assignments for those duplicate  topic/partitions.   After the topic/partition is put in /admin/reassign_partitions ControllerContext.partitionBeingReassigned to trigger the reassignment it    the topic/partition will be removed from /admin/reassign_partitions_queue,  and when /admin/reassign_partitions_queue is empty,  the znode will be deleted. 

...

As described above, compatibility with /admin/reassign_partitions is maintained, so existing software will continue working and the only difference to a client that operates on /admin/reassign_partitions would observe would be a slight increase in latency due to the round trips needed to create the new znode (/admin/reassign_partitions_queueand possible conflict resolutionThe newly introduced znode /admin/cancel_reassignment_in_progress is used solely for canceling/rollback of current reassignments still inflight pending in /admin/reassign_partitions

...