Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Reassignments especially for large topic/partition is costly.  In some case, the performance of the Kafka cluster can be severely impacted when reassignments are kicked off.   There should be a fast, clean, safe way to cancel and rollback the pending reassignments.   e.g.  original replicas [1,2,3],  new replicas [4,5,6],   causing performance impact on Leader 1,  the reassignment should be able to get cancelled immediately and reverted back to original replicas [1,2,3],  and dropping the new replicas. 
  2. If users need to do a large-scale reassignment they end up having to do the reassignment in smaller batches, so they can abort the overall reassignment sooner, if operationally necessary.   
  3. Each batch of reassignments takes as long as the slowest partition; this slowest partition prevents other reassignments from happening.   This can be happening even in the case submitting the reassignments by grouping similar size topic/partitions into each batch. How to optimally group reassignments into one batch for faster execution and less impact to the cluster is beyond the discussion in this KIP. 
  4. The ZooKeeper-imposed limit of 1MB on znode size places an upper limit on the number of reassignments that can be done at a given time. Note that in real Production environment, it's better to do reassignments in batches with reasonable reassignments in each batch.  Large number reassignments tends to cause higher Producer latency. Between batches,  proper staggering, throttling is recommended.  

...

Strictly speaking this is not a change that would affect any public interfaces (since ZooKeeper is not considered a public interface, and it can be made in a backward compatible way), however since some users are known to operate on the /admin/reassign_partitions znode directly,  this can could break in new version future versions of Kafka  (e.g. as reported in KAFKA-7854).   

...

Code Block
languagejs
{"version":1,
 "partitions":[{"topic": "foo1",
                "partition": 0,
		        "replicas": [1,2,4],
                "original_replicas": [1,2,3]
               },
               {"topic": "foo2",
                "partition": 1,
		        "replicas": [57,69,8],
                "original_replicas": [75,96,8]
               }]            
}

Proposed Changes


The main idea is to move away from using the single For submitting new reassignments while some are still pending,   the new /admin/reassign_partitions_extra znode is added,  The JSON format is similar to /admin/reassign_partitions znode to control reassignment. partitions 

Code Block
languagejs
{"version":1,
 "partitions":[{"topic": "foo1",
                "partition": 0,
		        "replicas": [1,2,5]
               },
               {"topic": "foo2",
                "partition": 1,
		        "replicas": [7,9,10]
               }]            
}

Proposed Changes


The main idea is support clean, safe cancellation of pending reassignments in /admin/reassign_partitions znode in a timely fashion, and support more reassignments while currently some reassignments are  in-flight. 

When client are submitting reassignments,  it only needs to submit  "replicas" (new replicas assignment) of the  topic / partition.  Before writing to   /admin/reassign_partitions, the current assigned replicas (original replicas) are read from Zookeeper and added the  "original_replicas"  for that topic/partition reassignments .  This  "original_replicas" will be used for rollback of the topic/partition replicas assignment during cancellation. 

To trigger the reassignment cancellation,  a new znode /admin/reassign_cancel  is created,  the controller will be informed of the reassignment cancellation via a ZooKeeper watch on this.   The controller will read the current pending reassignments in /admin/reassign_partitions and re-populate  ControllerContext.partitionRepliacAssignment.   For each pending topic/partition reassignments, the cancellation /rollback works like below, it's like the opposite of doing reassignments,  since we have the  "original_replicas" of each topic/partition reassignments in /admin/reassign_partitions  & ControllerContext.partitionRepliacAssignment,  it is much easier to rollback. 

RAR = Reassigned replicas
OAR = Original list of replicas for partition
AR = current assigned replicas
1. Set AR to OAR in memory.
2. If the leader is not in OAR, elect a new leader from OAR. If new leader needs to be elected from OAR, a LeaderAndIsr
will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent.
In any case, the LeaderAndIsr request will have AR = OAR. This will prevent the leader from adding any replica in
OAR - RAR back in the isr.
3. Move all replicas in RAR - OAR to OfflineReplica state. As part of OfflineReplica state change, we shrink the
isr to remove RAR - OAR in zookeeper and send a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr.
After that, we send a StopReplica (delete = false) to the replicas in RAR - OAR.
4. Move all replicas in RAR - OAR to NonExistentReplica state. This will send a StopReplica (delete = true) to
the replicas in RAR - OAR to physically delete the replicas on disk.
5. Update AR in ZK with OAR.
6. Update the /admin/reassign_partitions path in ZK to remove this partition.
7. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.

Note that  /admin/reassign_cancel will not be deleted after the cancellation is completed.    AdminClient  CLI  options will be added to  submit  reassignment cancellation,  and  remove the cancellation.  e..g.   The proposed  2 new options:  --cancel --remove-cancel

No Format
georgeli@kafka679-sjc1:~$ zkcli -h kafkazk80-sjc1 ls /kafka-sjc1-secpoc/admin/
[u'delete_topics']


georgeli@kafka679-sjc1:~$ /usr/lib/kafka/bin/kafka-reassign-partitions.sh  --zookeeper kafkazk80-sjc1/kafka-sjc1-secpoc --cancel
Rolling back the current pending reassignments Map(test_topic-25 -> Map(replicas -> Buffer(832, 679, 680), original_replicas -> Buffer(832, 679, 681)))
Successfully submitted cancellation of reassignments.

georgeli@kafka679-sjc1:~$ zkcli -h kafkazk80-sjc1 ls /kafka-sjc1-secpoc/admin/
[u'reassign_cancel',
 u'delete_topics']


georgeli@kafka679-sjc1:~ $ /usr/lib/kafka/bin/kafka-reassign-partitions.sh  --zookeeper kafkazk80-sjc1/kafka-sjc1-secpoc --remove-cancel 
Successfully remove the znode /admin/reassign_cancel.


georgeli@kafka679-sjc1:~$ zkcli -h kafkazk80-sjc1 ls /kafka-sjc1-secpoc/admin/
[u'delete_topics']

While znode /admin/reassign_cancel is present,  no new reassignments can be submitted.  e.g.  enforce before writing to /admin/reassign_partitions  and at  onPartitionReassignment()  with : 

No Format
if (zkClient.reassignCancelInPlace()) return // if the ReassignCancelZNode exists , skip reassignment


Instead a protocol similar to that used for config changes will be used. To start a reassignment (of a single partition, or a collection of partitions) a persistent sequenced znode will be created in /admin/reassignment_requests. The controller will be informed of this new request via a ZooKeeper watch. It will initiate the reassignment, create a znode /admin/reassignments/$topic/$partition and then delete the /admin/reassignment_requests/request_NNNN znode.

...