Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: A lot of changes

...

We would like to provide a well-supported AdminClient API that does not suffer from these problems.   Namely, it should support incremental replica reassignments and cancellation of ongoing reassignments.
This API can be used as a foundation on which to build future improvements.

...

Code Block
languagejava
/**
 * Change the reassignments for one or more partitions.
 *
 * @param reassignments   The reassignments to add, modify, or remove.
 * @param options         The options to use.
 * @return                The result.
 */
public AlterPartitionReassignmentsResult alterPartitionReassignments(
         Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
         AlterPartitionReassignmentsOptions options);

/**
 * A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map)}.
 */
public class NewPartitionReassignment {
    private final List<Integer> targetBrokers;
    ...
}

class AlterPartitionAssignmentsResult {
  Map<TopicPartition, Future<Void>> futures; // maps partitions to the results for each partition (success / failure)
  Future<Void> all(); // Throws an exception if any reassignment was rejected
}

class AlterPartitionAssignmentsOptions extends AbstractOptions<> {
  // contains timeoutMs
}

/**
 * List some of the current partition reassignments.
 * 
 * @param options        The options to use.
 * @param partitions     The partitions to show reassignments for.  Must be non-null.
 */
ListPartitionReassignmentsResult listPartitionReassignments(
      ListPartitionReassignmentsOptions options,
      Set<TopicPartition> partitions);


/**
 * List all of the current partition reassignments for the given topics.
 * 
 * @param options        The options to use.
 */
ListPartitionReassignmentsResult listPartitionReassignments(
 @param topics         ListPartitionReassignmentsOptions options);

class ListPartitionReassignmentsOptions extends AbstractOptions<> {
  // contains timeoutMs
}

class ListPartitionReassignmentsResult {
  private final KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments;
}The topics to show reassignments for.  Must be non-null.
 */
ListPartitionReassignmentsResult listPartitionReassignments(
      ListPartitionReassignmentsOptions options,
      Set<String> topics);


/**
 * AList partitionall reassignment,of whichthe hascurrent been listed via {@link AdminClient#listPartitionReassignments()}partition reassignments.
 */
public class PartitionReassignment {
    /**
 @param options   * The brokers which we wantThe thisoptions partition to reside onuse.
     */
ListPartitionReassignmentsResult listPartitionReassignments(
   private  final List<Integer>ListPartitionReassignmentsOptions targetBrokersoptions);

class ListPartitionReassignmentsOptions extends AbstractOptions<> {
  /**
 / contains timeoutMs
}

class ListPartitionReassignmentsResult {
  private final *KafkaFuture<Map<TopicPartition, The brokers which thisPartitionReassignment>> reassignments;
}

/**
 * A partition currently resides on.
    reassignment, which has been listed via {@link AdminClient#listPartitionReassignments()}.
 */
public class PartitionReassignment {
 private final List<Integer> currentBrokers;
...
}

New RPCs

AlterPartitionAssignmentsRequest

This is used to create or cancel a set of partition reassignments.  It must be sent to the controller.

This operation requires ALTER on CLUSTER.

Code Block
languagejs
{
  "apiKey": 45,
  "type": "request",
  "name": "AlterPartitionAssignmentsRequest",
  "validVersions": "0",
  "fields": [/**
     * The brokers which we want this partition to reside on.
    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000", */
    private final List<Integer> targetBrokers;

    /**
  "about": "The time in* msThe tobrokers waitwhich forthis thepartition requestcurrently toresides completeon." },
     */
    private final List<Integer> currentBrokers;
...
}

New RPCs

AlterPartitionReassignmentsRequest

This is used to create or cancel a set of partition reassignments.  It must be sent to the controller.

This operation requires ALTER on CLUSTER.

Code Block
languagejs
{
  "apiKey": 45,
  "type": "request",
  "name": "AlterPartitionReassignmentsRequest",
  "validVersions": "0",
  { "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+", "nullableVersions": "0+",
      "about": "The topics to reassign, or null to cancel all the pending reassignments for all topics.", "fields": [
      { "name": "NameTimeoutMs", "type": "stringint32", "versions": "0+",
 "default": "60000",
      "about": "The topic nametime in ms to wait for the request to complete." },
      { "name": "PartitionsTopics", "type": "[]ReassignablePartitionReassignableTopic", "versions": "0+", "nullableVersions": "0+",
        "about": "The partitionstopics to reassign, or null to cancel all the pending reassignments for thisall topictopics.", "fields": [
        { "name": "PartitionIndexName", "type": "int32string", "versions": "0+",
          "about": "The partitiontopic indexname." },
        { "name": "ReplicasPartitions", "type": "[]int32ReassignablePartition", "versions": "0+", "nullableVersions": "0+",
          "about": "The replicaspartitions to place the partitions onreassign, or null to cancel aall pending reassignment." }
 reassignments for this topic", "fields": [
       ]}
 { "name":  ]}
  ]
}

AlterPartitionAssignmentsResponse

This is the response from AlterPartitionsRequest.

Possible errors:

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to was not the controller.
  • INVALID_REPLICA_ASSIGNMENT: if the specified replica assignment was not valid-- for example, if it included negative numbers, repeated numbers, or specified a broker ID that the controller was not aware of.
  • CLUSTER_AUTHORIZATION_FAILED
"PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "Replicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+",
          "about": "The replicas to place the partitions on, or null to cancel a pending reassignment." }
      ]}
    ]}
  ]
}

AlterPartitionReassignmentsResponse

This is the response from AlterPartitionsReassignmentsRequest.

Possible errors:

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to was not the controller.
  • INVALID_REPLICA_ASSIGNMENT: if the specified replica assignment was not valid-- for example, if it included negative numbers, repeated numbers, or specified a broker ID that the controller was not aware of.
  • CLUSTER_AUTHORIZATION_FAILED


Code Block
languagejs
{
  "apiKey": 45,
  "type": "response",
  "name": "AlterPartitionReassignmentsResponse",
  "validVersions": "0",
  
Code Block
languagejs
{
  "apiKey": 45,
  "type": "response",
  "name": "AlterPartitionAssignmentsResponse",
  "validVersions": "0",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]ReassignablePartitionResponse", "versions": "0+",
      "about": "The responses to partitions to reassign.", "fields": [
      { "name": "ErrorCodeThrottleTimeMs", "type": "int16int32", "versions": "0+",
        "about": "The duration errorin code." },
      { "name": "ErrorString", "type": "string", "versions": "0+", "nullableVersionsmilliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Responses", "type": "[]ReassignablePartitionResponse", "versions": "0+",
        "about": "The errorresponses string,to orpartitions null if there was no error." }
      ]
    }
  ]
}

ListPartitionReassignmentsRequest

This RPC lists the currently active partition reassignments.  It must be sent to the controller.

It requires DESCRIBE on CLUSTER.

Code Block
languagejs
{
  "apiKey": 46,
  to reassign.", "fields": [
      { "name": "ErrorCode", "type": "requestint16",
  "nameversions": "ListPartitionReassignmentsRequest0+",
  "validVersions": "0",
  "fields": [
      "about": "The error code." },
      { "name": "TimeoutMsErrorString", "type": "int32string", "versions": "0+", "defaultnullableVersions": "600000+",
        "about": "The timeerror instring, msor tonull waitif forthere thewas requestno to completeerror." }
    {  ]
    }
  ]
}

ListPartitionReassignmentsRequest

This RPC lists the currently active partition reassignments.  It must be sent to the controller.

It requires DESCRIBE on CLUSTER.

Code Block
languagejs
{
  "apiKey": 46,
  "type": "request"name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+",
  "name": "ListPartitionReassignmentsRequest",
   "aboutvalidVersions": "The0",
 topics to list partition reassignments for, or null to list everything.", "fields "fields": [
      { "name": "PartitionIdsTimeoutMs", "type": "[]int32", "versions": "0+",
 "default": "60000",
      "about": "The time in partitionsms to wait for listthe partitionrequest reassignmentsto forcomplete." }
    ]}
  ]
}

ListPartitionReassignmentsResponse

Possible errors:

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to is not the controller.
  • CLUSTER_AUTHORIZATION_FAILED

If the top-level error code is set, no responses will be provided.

Code Block
languagejs
{
  "apiKey": 46,
  "type": "response",
  "name": "ListPartitionReassignmentsResponse",
  "validVersions": "0",
  { "name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+",
      "about": "The topics to list partition reassignments for, or null to list everything.", "fields": [
      { "name": "ThrottleTimeMsPartitionIds", "type": "[]int32", "versions": "0+",
        "about": "The durationpartitions into millisecondslist forpartition whichreassignments thefor, requestor wasnull throttled due to a quota violation, or zero if the request did not violate any quota.list everything" },
    ]}
  ]
}

ListPartitionReassignmentsResponse

Possible errors:

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to is not the controller.
  • CLUSTER_AUTHORIZATION_FAILED

If the top-level error code is set, no responses will be provided.


Code Block
languagejs
{
  "apiKey": 46,
  "type": "response",
  "name": "ListPartitionReassignmentsResponse",
  "validVersions": "0",
  "fields": [{ "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 on success." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The top-level error message, or null on success." },
    { "name": "TopicsThrottleTimeMs", "type": "[]OngoingTopicReassignmentint32", "versions": "0+",
      "about": "The ongoingduration in reassignmentsmilliseconds for each topic.", "fields": [
    which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "NameErrorCode", "type": "stringint16", "versions": "0+",
          "about": "The topic name top-level error code, or 0 on success." },
        { "name": "PartitionsErrorMessage", "type": "[]OngoingPartitionReassignmentstring", "versions": "0+",
 "nullableVersions": "0+",
        "about": "The ongoing reassignments for each partition.", "fields": [
    top-level error message, or null on success." },
       { "name": "PartitionIdTopics", "type": "int32[]OngoingTopicReassignment", "versions": "0+",
            "about": "The partition ID ongoing reassignments for each topic." },
 "fields": [
        { "name": "CurrentReplicasName", "type": "[]int32string", "versions": "0+",
            "about": "The replicastopic which the partition is currently assigned to." name." },
          { "name": "TargetReplicasPartitions", "type": "[]int32OngoingPartitionReassignment", "versions": "0+",
            "about": "The replicasongoing reassignments whichfor theeach partition is being reassigned to." }
.", "fields": [
          ]}
    ]}
  ]
}

Modified RPCs

LeaderAndIsrRequest

We will add a new field to LeaderAndIsrRequest named TargetReplicas.  This field will contain the replicas which will exist once the partition reassignment is complete.

Code Block
languagejs
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index b8988351c..d45727078 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -20,6 +20,8 @@
   // Version 1 adds IsNew.
   //
   // Version 2 adds broker epoch and reorganizes the partitions by topic.
+  //
+  // Version 3 adds TargetReplicas.
   "validVersions": "0-3",
   "fields": [
     { "name": "PartitionId", "type": "int32", "versions": "0+",
            "about": "The partition ID." },
          { "name": "CurrentReplicas", "type": "[]int32", "versions": "0+",
            "about": "The replicas which the partition is currently assigned to." },
          { "name": "ControllerIdTargetReplicas", "type": "[]int32", "versions": "0+",
@@ -48,6 +50,8 @@
           "about": "The ZooKeeperreplicas which the partition is being reassigned to." }
      ]}
    ]}
  ]
}

Modified RPCs

LeaderAndIsrRequest

We will add a new field to LeaderAndIsrRequest named TargetReplicas.  This field will contain the replicas which will exist once the partition reassignment is complete.

Code Block
languagejs
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index b8988351c..d45727078 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -20,6 +20,8 @@
   // Version 1 adds IsNew.
   //
   // Version 2 adds broker epoch and reorganizes the partitions by topic.
+  //
+  // Version 3 adds TargetReplicas.
   "validVersions": "0-3",
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+",
@@ -48,6 +50,8 @@version." },
         { "name": "Replicas", "type": "[]int32", "versions": "0+",
           "about": "The replica IDs." },
+        { "name": "TargetReplicas", "type": "[]int32", "versions": "3+", "nullableVersions": "0+",
+          "about": "The replica IDs that we are reassigning the partition to, or null if the partition is not being reassigned." },
         { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true, 
           "about": "WhetherThe the replica should have existed on the broker or notZooKeeper version." },
       ]}

kafka-reassign-partitions.sh

kafka-reassign-partitions.sh is a script to create, inspect, or modify partition reassignments.

We will remove the --zookeeper option from this script.  Instead, all operations will be performed through the admin client.  This will be initialized by the --bootstrap-server flag, which already exists.

There will be a new flag, --list, which will list all of the current replica reassigments that are going on in the cluster.  They will be output in the same JSON format that is accepted by --reassignment-json-file.  If there are no ongoing replica reassignments, an empty JSON object will be printed.

By default, when applying a reassignment plan JSON, the command-line tool will attempt to cancel any reassignment that isn't part of the plan.  If there are reassignments that are going on, but which are different than what is specified in the plan,. they will be altered.  This means that all reassignments can be cancelled by passing an empty JSON object as an argument.

This behavior can be overriden by a new flag, --incremental.  When --incremental is specified, the tool will not attempt to alter anything but what is specified in the JSON plan.  Partition reassignments can be cancelled when using --incremental by specifying the given partition in the plan with a null or empty replicas field.

kafka-topics.sh

When describing partitions, kafka-topics.sh will show an additional field, "targetReplicas."  This will have information about the target replicas for the given partition.  kafka-topics.sh will get this information from listReassigningPartitions.

Implementation

ZooKeeper Changes

/brokers/topics/[topic]/partitions/[partitionId]/state

This znode will now store an array of ints, named targetReplicas.  If this array is not present or null, there is no ongoing reassignment for this topic.  If it is non-null, it corresponds to the target replicas for this partition.  Replicas will be removed from targetReplicas as soon as they join the ISR.

Controller Changes

When the controller starts up, it will load all of the currently active reassignments from the partition state znodes.  This will not impose any additional startup overhead, because the controller needs to read these znodes anyway to start up.

The controller will handle ListPartitionReassignments by listing the current active reassignments.

The controller will handle the AlterPartitionAssignmentsResponse RPC by modifying the specified partition states in /brokers/topics/[topic]/partitions/[partitionId]/state.  Once these nodes are modified, the controller will send out a LeaderAndIsrRequest as appropriate.

Tool Changes

The kafka-reassign-partitions.sh tool will use the new AdminClient APIs to submit the reassignment plan.  It will not use the old ZooKeeper APIs any more.  In order to contact the admin APIs, the tool will accept a --bootstrap-server argument.

When changing the throttle, we will use IncrementalAlterConfigs rather than directly writing to Zookeeper.

Compatibility, Deprecation, and Migration Plan

/admin/reassign_partitions znode

For compatibility purposes, we will continue to allow assignments to be submitted through the /admin/reassign_partitions node.  Just as with the current code, this will only be possible if there are no current assignments.  In other words, the znode has two states: empty and waiting for a write, and non-empty because there are assignments in progress. Once the znode is non-empty, further writes to it will be ignored.

Applications using the old API will not be able to cancel in-progress assignments.  They will also not be able to monitor the status of in-progress assignments, except by polling the znode to see when it becomes empty, which indicates that no assignments are ongoing.  To get these benefits, applications will have to be upgraded to the AdminClient API.

  { "name": "Replicas", "type": "[]int32", "versions": "0+",
           "about": "The replica IDs." },
+        { "name": "TargetReplicas", "type": "[]int32", "versions": "3+", "nullableVersions": "0+",
+          "about": "The replica IDs that we are reassigning the partition to, or null if the partition is not being reassigned." },
         { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true, 
           "about": "Whether the replica should have existed on the broker or not." }
       ]}

Proposed Changes

Cancellation

We propose supporting the "cancellation" of an on-going rebalance. This is not a revert. To concretize the notion - a cancellation of a rebalance means stopping on-going partition movements. A partition reassignment which is half done will not have the new replicas that have entered the ISR be reverted, it will only stop the in-flight movements for replicas that are not in ISR.
Note that it is possible to have a partition's replication factor become higher than configured if a reassignment is stopped half-way. We do not propose a clear way to guard against this nor revert it – users are expected to run another reassignment

kafka-reassign-partitions.sh

kafka-reassign-partitions.sh is a script to create, inspect, or modify partition reassignments.

We will remove the --zookeeper option from this script.  Instead, all operations will be performed through the admin client.  This will be initialized by the --bootstrap-server flag, which already exists.

There will be a new flag, --list, which will list all of the current replica reassignments that are going on in the cluster.  They will be output in the same JSON format that is accepted by --reassignment-json-file.  If there are no ongoing replica reassignments, an empty JSON object will be printed.

Cancellation

The reassign-partitions tool will work in an incremental fashion – any new reassignment will be added on top of the current ones. It will continue to output a rollback JSON as part of any reassignment in order to let the user revert the changes.
Additionally, we will support a --cancel-all flag which will cancel all on-going partition reassignments. This providers users with an intuitive, quick and easy way to cancel multiple reassignments at once.

kafka-topics.sh

When describing partitions, kafka-topics.sh will show an additional field, "targetReplicas."  This will have information about the target replicas for the given partition.  kafka-topics.sh will get this information from listReassigningPartitions.

Implementation

ZooKeeper Changes

/brokers/topics/[topic]/partitions/[partition]/state

We will store the reassignment data for each partition in this zNode, as that is where we already store the current ISR.
This node will store a new structure named targetReplicas - an array of ints.  If this array is not present or null, there is no ongoing reassignment for this topic.  If it is non-null, it contains the ongoing reassignments for a certain partition to the target replicas for that partition.

Code Block
languagejs
titlepartition/state zNode content
{  
    "version":1,
    "leader":1,
    "leader_epoch":1,
    "controller_epoch":1,
    "isr":[1, 2, 3],
    "targetReplicas":[4, 5, 6] # <-- NEW
}


Once the whole reassignment for a given partition is done, we will first remove the old replicas from that partition's entry in the `/brokers/topics/[topic]` zNode and then clear the targetReplicas the /state zNode. This is consistent with our current behavior and illustrated further in an example below.

Additionally, because we are not making incompatible changes to the zNode, we will not be bumping up the inter-broker protocol version.

Algorithm

Code Block
# Illustrating the rebalance of a single partition.
# R is the current replicas, I is the ISR list and T is the targetReplicas -- what we want to reassign the partition to.
R: [1, 2, 3], I: [1, 2, 3], T: []
# Reassignment called
R: [1, 2, 3], I: [1, 2, 3], T: [4, 5, 6]
R: [1, 2, 3], I: [1, 2, 3, 4], T: [4, 5, 6]
R: [1, 2, 3, 4], I: [1, 2, 3, 4], T: [4, 5, 6]
R: [1, 2, 3, 4], I: [1, 2, 3, 4, 5], T: [4, 5, 6]
R: [1, 2, 3, 4, 5], I: [1, 2, 3, 4, 5], T: [4, 5, 6]
R: [1, 2, 3, 4, 5], I: [1, 2, 3, 4, 5, 6], T: [4, 5, 6]
R: [1, 2, 3, 4, 5, 6], I: [1, 2, 3, 4, 5, 6], T: [4, 5, 6]
R: [4, 5, 6], I: [1, 2, 3, 4, 5, 6], T: null
R: [4, 5, 6], I: [4, 5, 6], T: null

Note that we finish the reassignment at the very next-to-last step only. We first remove the old replicas from the '/brokers/topics/[topic]' zNode. Then, as part of the 'leader_epoch' bump we will also remove the 'targetReplicas' in the /state zNode. We send a LeaderAndIsr request to the appropriate leader with the new replicas (4, 5, 6) and the full ISR (1-6). The leader itself shrinks the ISR to only the replicaSet. This is all consistent to Kafka's current behavior, the only new addition being clearing the 'targetReplicas' collection.

Saving 'targetReplicas' in tact until the very end opens up the possibility to easily revert an on-going reassignment in the future (KIP-236 may implement this).
The alternative of iteratively shrinking the replicas and targetReplicas at each step introduces more complexity, more potential edge cases and extra ZK writes. It does not seem to yield many benefits.

Controller Changes

When the controller starts up, it will load all of the currently active reassignments from the `[partition]/state' zNodes and the `admin/reassign_partitions` zNode, with the latter taking precedence. This will not impose any additional startup overhead, because the controller needs to read these zNode anyway to start up.
The reason for having the `reassign_partitions` take precedence is 

The controller will handle ListPartitionReassignments by listing the current active reassignments. It will keep an in-memory cache of on-going reassignments that it loads from the zNode and updates on further reassignments.

The controller will handle the AlterPartitionAssignmentsResponse RPC by modifying the specified zNode in /brokers/topics/[topic].  Once the node is modified, the controller will send out LeaderAndIsr to start all the movements.
At this point, we preserve the old behavior. Namely, tracking when new replicas enter the ISR, reconciling them against the target reassignments and stopping

Tool Changes

The kafka-reassign-partitions.sh tool will use the new AdminClient APIs to submit the reassignment plan.  It will not use the old ZooKeeper APIs any more.  In order to contact the admin APIs, the tool will accept a --bootstrap-server argument.

When changing the throttle, we will use IncrementalAlterConfigs rather than directly writing to Zookeeper.

Compatibility, Deprecation, and Migration Plan

/admin/reassign_partitions znode

For compatibility purposes, we will continue to allow assignments to be submitted through the /admin/reassign_partitions node.  Just as with the current code, this will only be possible if there are no current assignments.  In other words, the znode has two states: empty and waiting for a write, and non-empty because there are assignments in progress. Once the znode is non-empty, further writes to it will be ignored.

Applications using the old API will not be able to cancel in-progress assignments.  They will also not be able to monitor the status of in-progress assignments, except by polling the znode to see when it becomes empty, which indicates that no assignments are ongoing.  To get these benefits, applications will have to be upgraded to the AdminClient API.

The deprecated ZooKeeper-based API will be removed in a future release.

There is one known edge-case with using both APIs which we will intentionally leave unaddressed:

Code Block
1. zNode reassignment involving partitionA and partitionB starts
2. API reassignment involving partitionA starts
3. Controller failover. It will effectively ignore the API reassignment in step 2. and move partitionA to the replica given
reassignment by the zNode

There are ways to handle this edge-case, but we propose ignoring it due to the fact that:

  • it is very unlikely for users to use both APIs at once
  • we will be deprecating the zNode API anyway
  • it is not worth the complexity and performance hit from addressing such a rare and seemingly non-critical edge-case

Reassignments while upgrading versions

In the event of a software upgrade, we may end up with a temporarily-stuck reassignment. Imagine the following scenario:

Code Block
Brokers A, B and C
Controller=BBroker A gets upgraded to 2.4 (which contains this KIP)
Broker B gets upgraded to 2.4. Controller moves to Broker A
Reassignments happens, Broker A sends a LeaderAndIsr request with TargetReplicas to C but it is ignored by C, as it does not know about the field yet.
Broker C gets upgraded to 2.4.

At this point, part of the reassignment is "stuck". Partitions with a targetReplica of C will never complete until the controller gets restarted and a new LeaderAndIsr is sent out.

Reassignments while downgrading versions

In the event of a software downgrade, the extra field in the LeaderAndIsr request will be ignored by brokers running the old code and the reassignment will stop. As soon as the controller gets downgraded, it will send a LeaderAndIsr with no targetReplicas which ensures that all brokers will stop their reassignmentsThe deprecated ZooKeeper-based API will be removed in a future release.


Rejected Alternatives

Store the pending replicas in a single ZNode rather than in a per-partition ZNode

...