Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/**
 * Change the reassignments for one or more partitions.
 *
 * @param reassignments   The reassignments to add, modify, or remove.
 * @param options         The options to use.
 * @return                The result.
 */
public AlterPartitionReassignmentsResult alterPartitionReassignments(
         Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
         AlterPartitionReassignmentsOptions options);

/**
 * A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map)}.
 */
public class NewPartitionReassignment {
    private final List<Integer> targetBrokers;
    ...
}

class AlterPartitionAssignmentsResult {
  Map<TopicPartition, Future<Void>> futures; // maps partitions to the results for each partition (success / failure)
  Future<Void> all(); // Throws an exception if any reassignment was rejected
}

class AlterPartitionAssignmentsOptions extends AbstractOptions<> {
  // contains timeoutMs
}

/**
 * List some of the current partition reassignments.
 * 
 * @param options        The options to use.
 * @param partitions     The partitions to show reassignments for.  Must be non-null.
 */
ListPartitionReassignmentsResult listPartitionReassignments(
      ListPartitionReassignmentsOptions options,
      Set<TopicPartition> partitions);


/**
 * List all of the current partition reassignments for the given topics.
 * 
 * @param options        The options to use.
 * @param topics         The topics to show reassignments for.  Must be non-null.
 */
ListPartitionReassignmentsResult listPartitionReassignments(
      ListPartitionReassignmentsOptions options,
      Set<String> topics);


/**
 * List all of the current partition reassignments.
 * 
 * @param options        The options to use.
 */
ListPartitionReassignmentsResult listPartitionReassignments(
      ListPartitionReassignmentsOptions options);

class ListPartitionReassignmentsOptions extends AbstractOptions<> {
  // contains timeoutMs
}

class ListPartitionReassignmentsResult {
  private final KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments;
}

/**
 * A partition reassignment, which has been listed via {@link AdminClient#listPartitionReassignments()}.
 */
public class PartitionReassignment {
    /**
     * The brokers which we want this partition to reside on.
     */
    private final List<Integer> targetBrokersreplicas;

    /**
     * The brokers which have thisin-sync partitionreplicas currentlyof residesthis onpartition.
     */
    private final List<Integer> currentBrokersisr;
...
}

New RPCs

AlterPartitionReassignmentsRequest

This is used to create or cancel a set of partition reassignments.  It must be sent to the controller.

This operation requires ALTER on CLUSTER.


    /**
     * The brokers that we are adding this partition to as part of a reassignment.
     */
    private final List<Integer> addingReplicas;

    /**
     * The brokers that we are removing this partition from as part of a reassignment.
     */
    private final List<Integer> removingReplicas;
}

New RPCs

AlterPartitionReassignmentsRequest

This is used to create or cancel a set of partition reassignments.  It must be sent to the controller.

This operation requires ALTER on CLUSTER.

Code Block
languagejs
{
  "apiKey": 45,
  "type": "request",
  "name": "AlterPartitionReassignmentsRequest",
  "validVersions": "0",
  "fields": [
    
Code Block
languagejs
{
  "apiKey": 45,
  "type": "request",
  "name": "AlterPartitionReassignmentsRequest",
  "validVersions": "0",
  "fields": [
    { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
      "about": "The time in ms to wait for the request to complete." },
    { "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+", "nullableVersions": "0+",
      "about": "The topics to reassign, or null to cancel all the pending reassignments for all topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The topic name." },
      { "name": "PartitionsTimeoutMs", "type": "[]ReassignablePartitionint32", "versions": "0+", "nullableVersionsdefault": "0+60000",
        "about": "The partitionstime toin reassign, or null ms to cancelwait allfor pendingthe reassignmentsrequest forto this topic", "fields": [complete." },
        { "name": "PartitionIndexTopics", "type": "int32[]ReassignableTopic", "versions": "0+",
          "about": "The partitiontopics indexto reassign." },
 "fields": [
      { "name": "ReplicasName", "type": "[]int32string", "versions": "0+", "nullableVersions": "0+",
          "about": "The replicastopic to place the partitions on, or null to cancel a pending reassignment for this partition." }name." },
      { "name": "Partitions", "type": "[]ReassignablePartition", "versions": "0+",
      ]}
  "about": "The ]}
partitions  ]
}

AlterPartitionReassignmentsResponse

This is the response from AlterPartitionsReassignmentsRequest.

Possible errors:

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to was not the controller.
  • INVALID_REPLICA_ASSIGNMENT: if the specified replica assignment was not valid-- for example, if it included negative numbers, repeated numbers, or specified a broker ID that the controller was not aware of.
  • NO_REASSIGNMENT_IN_PROGRESS: If a cancellation was called on a topic/partition which was not in the middle of a reassignment
  • CLUSTER_AUTHORIZATION_FAILED
to reassign.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "Replicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+",
          "about": "The replicas to place the partitions on, or null to cancel a pending reassignment for this partition." }
      ]}
    ]}
  ]
}

AlterPartitionReassignmentsResponse

This is the response from AlterPartitionsReassignmentsRequest.

Possible errors:

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to was not the controller.
  • INVALID_REPLICA_ASSIGNMENT: if the specified replica assignment was not valid-- for example, if it included negative numbers, repeated numbers, or specified a broker ID that the controller was not aware of.
  • NO_REASSIGNMENT_IN_PROGRESS: If a cancellation was called on a topic/partition which was not in the middle of a reassignment
  • CLUSTER_AUTHORIZATION_FAILED: If we didn't have sufficient permission to perform the alteration.


Code Block
languagejs
{
  "apiKey": 45,
  "type": "response",
  "name": "AlterPartitionReassignmentsResponse",
  "validVersions": "0",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code." },
    { "name": "ErrorString", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The error string, or null if there was no error." }
Code Block
languagejs
{
  "apiKey": 45,
  "type": "response",
  "name": "AlterPartitionReassignmentsResponse",
  "validVersions": "0",
  "fields": [
    { "name": "ThrottleTimeMsResponses", "type": "int32[]ReassignableTopicResponse", "versions": "0+",
      "about": "The durationresponses into milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
topics to reassign.", "fields": [
      { "name": "ErrorCode", "type": "int16", "versions": "0+",
        "about": "The error code." },
      { "name": "ErrorString", "type": "string", "versions": "0+", "nullableVersions": "0+",
        "about": "The error string, or null if there was no error." }
      { "name": "ResponsesPartitions", "type": "[]ReassignableTopicResponse", "versions": "0+[]ReassignablePartitionResponse", "nullableVersionsversions": "0+",
        "about": "The responses to topicspartitions to reassign.", "fields": [
          { "name": "ErrorCodePartitionIndex", "type": "int16int32", "versions": "0+",
            "about": "The errorpartition codeindex." },
          { "name": "ErrorStringErrorCode", "type": "stringint16", "versions": "0+", "nullableVersions": "0+",

            "about": "The error string, or null if there was no error." }
code." },
          { "name": "PartitionsErrorString", "type": "[]ReassignablePartitionResponsestring", "versions": "0+", "nullableVersions": "0+",
            "about": "The responses to partitions to reassign", "fields": [ error string, or null if there was no error." }
        ]
  { "name": "PartitionIndex", "type": "int32", "versions": "0+", ]
            "about": "The partition index." },
          { "name": "ErrorCode", "type": "int16", "versions}
  ]
}

ListPartitionReassignmentsRequest

This RPC lists the currently active partition reassignments.  It must be sent to the controller.

It requires DESCRIBE on CLUSTER.

Code Block
languagejs
{
  "apiKey": 46,
  "type": "request",
  "name": "ListPartitionReassignmentsRequest",
  "validVersions": "0+",
            "about"fields": "The error code." },
   [
       { "name": "ErrorStringTimeoutMs", "type": "stringint32", "versions": "0+", "nullableVersionsdefault": "0+60000",
     
       "about": "The time errorin string,ms orto nullwait iffor therethe wasrequest noto errorcomplete." }
        ]
      ]
    }
  ]
}

ListPartitionReassignmentsRequest

This RPC lists the currently active partition reassignments.  It must be sent to the controller.

It requires DESCRIBE on CLUSTER.

Code Block
languagejs
{
  "apiKey": 46,
  "type": "request",
  "name": "ListPartitionReassignmentsRequest",
  "validVersions": "0",
 { "name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+",
      "about": "The topics to list partition reassignments for, or null to list everything.", "fields": [
      { "name": "TimeoutMsPartitionIndexes", "type": "[]int32", "versions": "0+",
 "default": "60000",
      "about": "The timepartitions into mslist topartition waitreassignments for" the}
 request to complete." ]}
    { "name": "Topics",]
}

ListPartitionReassignmentsResponse

Possible errors:

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to is not the controller.
  • CLUSTER_AUTHORIZATION_FAILED: if we didn't have sufficient permissions.

If the top-level error code is set, no responses will be provided.


Code Block
languagejs
{
  "apiKey": 46,
  "type": "[]ListPartitionReassignmentsTopicsresponse",
  "versionsname": "0+ListPartitionReassignmentsResponse",
  "nullableVersionsvalidVersions": "0+",
      "about": "The topics to list partition reassignments for, or null to list everything.", "fields": [
      { "name": "PartitionIndexesThrottleTimeMs", "type": "[]int32", "versions": "0+", "nullableVersions": "0+",
        "about": "The partitions to list partition reassignmentsduration in milliseconds for, orwhich nullthe torequest listwas everything"throttled }
due to a quota ]}
violation,  ]
}

ListPartitionReassignmentsResponse

Possible errors:

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to is not the controller.
  • CLUSTER_AUTHORIZATION_FAILED

If the top-level error code is set, no responses will be provided.

Code Block
languagejs
{
  "apiKey": 46,
  or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "responseint16",
  "nameversions": "ListPartitionReassignmentsResponse0+",
      "validVersionsabout": "0",
  "fields": [The top-level error code, or 0 on success." },
    { "name": "ErrorMessage", "type": "ThrottleTimeMsstring", "typeversions": "int320+", "versionsnullableVersions": "0+",
      "about": "The durationtop-level in milliseconds for which the request was throttled due to a quota violationerror message, or zeronull if the request did not violate any quotaon success." },
    { "name": "ErrorCodeTopics", "type": "int16[]OngoingTopicReassignment", "versions": "0+",
      "about": "The top-level error code, or 0 on success." },
ongoing reassignments for each topic.", "fields": [
        { "name": "ErrorMessageName", "type": "string", "versions": "0+", "nullableVersions": "0+",

          "about": "The top-level error message, or null on success." },
topic name." },
        { "name": "TopicsPartitions", "type": "[]OngoingTopicReassignmentOngoingPartitionReassignment", "versions": "0+",
          "about": "The ongoing reassignments for each topicpartition.", "fields": [
          { "name": "NamePartitionIndex", "type": "stringint32", "versions": "0+",
            "about": "The index of topicthe namepartition." },
          { "name": "PartitionsReplicas", "type": "[]OngoingPartitionReassignmentint32", "versions": "0+",
            "about": "The ongoingcurrent reassignments for each partitionreplica set.", "fields": [ },
          { "name": "PartitionIndexIsr", "type": "[]int32", "versions": "0+",
            "about": "The indexcurrent ofin-sync thereplica partitionset." },
          { "name": "CurrentReplicasAddingReplicas", "type": "[]int32", "versions": "0+",
            "about": "The replicasset whichof thereplicas partitionwe isare currently assigned toadding." },
          { "name": "TargetReplicasRemovingReplicas", "type": "[]int32", "versions": "0+",
            "about": "The replicasset whichof thereplicas partitionwe isare beingcurrently reassigned toremoving." }
      ]}
    ]}
  ]
}

Modified RPCs

...

Code Block
languagejs
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index b8988351c..d45727078 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -20,6 +20,8 @@
   // Version 1 adds IsNew.
   //
   // Version 2 adds broker epoch and reorganizes the partitions by topic.
+  //
+  // Version 3 adds AddingReplicas and RetiringReplicasRemovingReplicas.
   "validVersions": "0-3",
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+",
@@ -48,6 +50,8 @@
           "about": "The ZooKeeper version." },
         { "name": "Replicas", "type": "[]int32", "versions": "0+",
           "about": "The replica IDs." },
+        { "name": "AddingReplicas", "type": "[]int32", "versions": "3+", "nullableVersions": "0+",
+          "about": "The replica IDs that we are adding this partition to, or null if the partition is not being added to any new replicas adding this partition to." },
+        { "name": "RetiringReplicas", "type": "[]int32", "versions": "3+", "nullableVersions": "0+",
+          "about": "The replica IDs that we are removing this partition from, or null if the partition is not being from any replicas." },
         { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true, 
           "about": "Whether the replica should have existed on the broker or not." }
       ]}

...

We propose supporting the "cancellation" of an on-going rebalance in all types of granularity - at the partition level, topic level and global (stop all reassignments).
Cancellation partition rebalance.  Cancellation is not the same as a revert. To concretize the notion - a cancellation of a rebalance means stopping on-going partition movements. A partition reassignment which is half done will not have the new replicas that have entered the ISR be reverted, it will only stop the in-flight movements for replicas that are not in ISR.
  Note that it is possible to have a partition's replication factor become higher than configured if a reassignment is stopped half-way. We do not propose a clear way to guard against this nor revert it – users are expected to run another reassignment.

...

The reassign-partitions tool will work in an incremental fashion – any new reassignment will be added on top of the current ones. It will continue to output a rollback JSON as part of any reassignment in order to let the user revert the changes.

Additionally, we will support a --create-cancel-all flag which cancellation-plan flag.  This flag will create a JSON file that, if applied, will cancel all on-going partition reassignments. This providers users with an intuitive, quick and easy way to cancel multiple reassignments at once.
The protocol also supports cancellation at the topic-level but we will not be leveraging that in this tool because the JSON schema used is not suitable for coarser-grained cancellations. Any further improvements to the tool and the JSON format it uses can be slated for future work

Not that there is a slight race condition here when using the tool.  Someone may create a new reassignment in between creating the cancellation plan and applying it.  This new reassignment would not be cancelled by the original cancellation plan.  In general, we expect system administrators to stop any process or person that is creating new reassignments before cancelling all pending reassignments, so we don't expect this to be a problem in practice.  If such processes are not stopped, the effect of cancelling all pending reassignments will be negated anyway, by the creation of new reassignments.

kafka-topics.sh

When describing partitions, kafka-topics.sh will show an additional field, "targetReplicastwo additional fields, "addingReplicas," and "removingReplicas."  This These will have information about the target adding and removing replicas for the given partition. kafka-topics.sh will get this information from listReassigningPartitions.

...

We will store the reassignment data for each partition in this znode, as that is where we already store the current replicaSet for every partition.
This node will store two new maps, named retiringReplicas addingReplicas and addingReplicas removingReplicas. Each key is the partition and the value is an array of integers denoting the replicas we should retire add or add remove for that partition to, respectively. 


If both maps are empty, there is no on-going reassignment for that topic.

Code Block
languagejs
titletopics/[topic] zNode content
{
  "version": 1,
  "partitions": {"0": [1, 4, 2, 3] },
  "retiringReplicasaddingReplicas": {"0": [14] }, # <----- NEW
  "addingReplicasremovingReplicas": {"0": [41] }, # <----- NEW
}


Because we are not making incompatible changes to the znode, we will not be bumping up the inter-broker protocol version.

...

When the controller starts up, it will load all of the currently active reassignments from the `topics/[topic]' znodes and the `admin/reassign_partitions` znode, with the latter taking precedence. This will not impose any additional startup overhead, because the controller needs to read these znodes anyway to already reads these znodes during start up.
The reason for having the `reassign_partitions` take precedence is that we cannot know for sure whether the Controller has taken action on that znode, so we allow it to overwrite the API-assigned reassignments in the event of a failover.

The controller will handle ListPartitionReassignments by listing the current active reassignments. It will keep an   It can supply this information directly from its in-memory cache.  Note that this in-memory cache of on-going reassignments that it loads from ZooKeeper and updates on further reassignmentsall ongoing reassignments already existed prior to this KIP.

The controller will handle the AlterPartitionAssignmentsResponse RPC by modifying the /brokers/topics/[topic] znode.  Once the node is modified, the controller will send out LeaderAndIsr LeaderAndIsrRequest to start all the movements.

Once the controller is notified that a new replica has entered the ISR for a particular partition, it will remove it from the "addingReplicas" field in `/topics/[topic]`. If "addingReplicas" becomes empty, the controller will send a new wave of LeaderAndIsr requests to retire all the old replicas and, if applicable, elect new leaders.

...

We will always empty out the "addingReplicas" field before starting to act on the "retiringReplicasremovingReplicas" field.

Code Block
# Illustrating the rebalance of a single partition.
# R is the current replicas, I is the ISR list, RRAR is retiringReplicasaddingReplicas and ARRR is addingReplicasremovingReplicas
R: [1, 2, 3], I: [1, 2, 3], AR: [], RR: []
# Reassignment called via API with targetReplicas=[4,3,2]

R: [1, 4, 3, 2], I: [1, 2, 3], AR: [4], RR: [1] # Controller sends LeaderAndIsr requests 
# (We ignore RR until AR is empty)
R: [1, 4, 3, 2], I: [1, 2, 3, 4], AR: [4], RR: [1] # Leader 1 adds 4 to ISR
# Controller picks up on the change and removes 4 from AR.
# (in-memory) - R: [1, 4, 3, 2], I: [1, 2, 3, 4], AR: [], RR: [1] 
# Controller realizes AR is empty and starts work on RR. Removes all of RR from R and sends LeaderAndIsr requests
R: [4, 3, 2], I: [1, 2, 3, 4], AR: [], RR: [] # at this point the reassignment is considered complete
# Leaders change, 1 retires and 4 starts leading. 4 shrinks the ISR
R: [4, 3, 2], I: [2, 3, 4], AR: [], RR: []

...