Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Master KIP

KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum (Accepted)

Status

Current state: accepted

...

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8345

Release: controller-side changes in 2.4, command line changes in 2.6

Motivation

Currently, users initiate replica reassignment by writing directly to a ZooKeeper node named /admin/reassign_partitions.

...

Code Block
languagejava
/**
 * Change the reassignments for one or more partitions.
 * Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>cancel</bold> the reassignment for the associated partition.
 *
 * @param reassignments   The reassignments to add, modify, or remove.
 * @param options         The options to use.
 * @return                The result.
 */
public AlterPartitionReassignmentsResult alterPartitionReassignments(
         Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments,
         AlterPartitionReassignmentsOptions options);

/**
 * A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map)}.
 */
public class NewPartitionReassignment {
    private final List<Integer> targetBrokerstargetReplicas;

    ...
}

class AlterPartitionAssignmentsResult {
  Map<TopicPartition, KafkaFuture<Void>> values()
  Future<Void> all(); // Throws an exception if any reassignment was rejected
}

class /**
     * @throws IllegalArgumentException if no replicas are supplied
	 */
    public NewPartitionReassignment(List<Integer> targetReplicas)

    public List<Integer> targetReplicas()
}

class AlterPartitionAssignmentsResult {
  Map<TopicPartition, KafkaFuture<Void>> values()
  Future<Void> all(); // Throws an exception if any reassignment was rejected
}

class AlterPartitionAssignmentsOptions extends AbstractOptions<> {
  // contains timeoutMs
}

/**
 * List some of the current partition reassignments.
 * 
 * @param partitions     The partitions to show reassignments for. Must be non-null.
 * @param options        The options to use.
 */
public ListPartitionReassignmentsResult listPartitionReassignments(
      Set<TopicPartition> partitions,
      ListPartitionReassignmentsOptions options);

/**
 * List all of the current partition reassignments.
 * 
 * @param options        The options to use.
 */
public ListPartitionReassignmentsResult listPartitionReassignments(
      ListPartitionReassignmentsOptions options);

abstract ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions,
                                                                     ListPartitionReassignmentsOptions options)

class ListPartitionReassignmentsOptions extends AbstractOptions<> {
  // contains timeoutMs
}

class ListPartitionReassignmentsResult {
  private final KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments;
}

/**
 * A partition reassignment, which has been listed via {@link AdminClient#listPartitionReassignments()}.
 */
public class PartitionReassignment {
    /**
     * The brokers which this partition currently resides on.
     */
    private final List<Integer> replicas;

    /**
     * The brokers that we are adding this partition to as part of a reassignment.
     */
    private final List<Integer> addingReplicas;

    /**
     * The brokers that we are removing this partition from as part of a reassignment.
     */
    private final List<Integer> removingReplicas;
}

...

This operation requires ALTER on CLUSTER.

Note that even though we have TimeoutMs, this field will be ignored in the implementation for the time being. Its implementation is tracked in 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8873

Code Block
languagejs
{
  "apiKey": 45,
Code Block
languagejs
{
  "apiKey": 45,
  "type": "request",
  "name": "AlterPartitionReassignmentsRequest",
  "validVersions": "0",
  "fields": [
    { "name": "TopicsTimeoutMs", "type": "[]ReassignableTopicint32", "versions": "0+", "default": "60000",
      "about": "The time in topicsms to reassign.", "wait for the request to complete." },
    { "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+",
      "about": "The topics to reassign.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]ReassignablePartition", "versions": "0+",
        "about": "The partitions to reassign.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "Replicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The replicas to place the partitions on, or null to cancel a pending reassignment for this partition." }
      ]}
    ]}
  ]
}

...

This is the response from AlterPartitionsReassignmentsRequest.

Possible top-level errors:

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to was not the controller.
  • UNKNOWNCLUSTER_TOPIC_OR_PARTITION - when a topic/partition doesn't exist or a topic deletion is in progressAUTHORIZATION_FAILED: If we didn't have sufficient permission to perform the alteration.

Possible partition-level errors:

  • INVALID_REPLICA_ASSIGNMENT - when the replica assignment is not valid (brokers are not part of the cluster, negative replica ids, empty assignment)
  • NO_REASSIGNMENT_IN_PROGRESS : If - when a cancellation was called on a topic/partition which was not in the middle of a reassignmentCLUSTER
  • UNKNOWN_TOPIC_AUTHORIZATION_FAILED: If we didn't have sufficient permission to perform the alteration.OR_PARTITION - when a topic/partition doesn't exist or a topic deletion is in progress


Code Block
languagejs
{
  "apiKey": 45,
  "type": "response",
  "name": "AlterPartitionReassignmentsResponse",
  "validVersions": "0",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The error message, or null if there was no error." }
    { "name": "Responses", "type": "[]ReassignableTopicResponse", "versions": "0+",
      "about": "The responses to topics to reassign.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name" },
      { "name": "Partitions", "type": "[]ReassignablePartitionResponse", "versions": "0+",
        "about": "The responses to partitions to reassign", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The partition index." },
          { "name": "ErrorCode", "type": "int16", "versions": "0+",
            "about": "The error code." },
          { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
            "about": "The error message, or null if there was no error." }
        ]
      ]
    }
  ]
}

...

It requires DESCRIBE on CLUSTER.

Note that even though we have TimeoutMs, this field will be ignored in the implementation for the time being. Its implementation is tracked in 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8873

Code Block
languagejs
{
  "apiKey": 46,
  "type": "request",
  "name": "ListPartitionReassignmentsRequest",
  "validVersions": "0",
  "fields": [
    { "name": "TopicsTimeoutMs", "type": "[]ListPartitionReassignmentsTopicsint32", "versions": "0+", "nullableVersionsdefault": "0+60000",
      "about": "The topicstime toin listms partitionto reassignmentswait for, orthe nullrequest to complete." }
    { "name": "Topics", "type": "[]ListPartitionReassignmentsTopics", "versions": "0+", "nullableVersions": "0+",
      "about": "The topics to list partition reassignments for, or null to list everything.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name" },
      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
        "about": "The partitions to list partition reassignments for" }
    ]}
  ]
}

ListPartitionReassignmentsResponse

Possible errors:

  • REQUEST_TIMED_OUT: if the request timed out.
  • NOT_CONTROLLER: if the node we sent the request to is not the controller.
  • UNKNOWN_TOPIC_OR_PARTITION
  • CLUSTERCLUSTER_AUTHORIZATION_FAILED: if we didn't have sufficient permissions.

...

If a partition is not going through a reassignment, its AddingReplicas and RemovingReplicas fields will simply be empty.

If a partition doesn't exist, no response will be returned for it.

Code Block
languagejs
{
  "apiKey": 46,
  "type": "response",
  "name": "ListPartitionReassignmentsResponse",
  "validVersions": "0",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 on success." },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+",
      "about": "The top-level error message, or null on success." },
    { "name": "Topics", "type": "[]OngoingTopicReassignment", "versions": "0+",
      "about": "The ongoing reassignments for each topic.", "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The topic name." },
        { "name": "Partitions", "type": "[]OngoingPartitionReassignment", "versions": "0+",
          "about": "The ongoing reassignments for each partition.", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "0+",
            "about": "The index of the partition." },
          { "name": "Replicas", "type": "[]int32", "versions": "0+",
            "about": "The current replica set." },
          { "name": "AddingReplicas", "type": "[]int32", "versions": "0+",
            "about": "The set of replicas we are currently adding." },
          { "name": "RemovingReplicas", "type": "[]int32", "versions": "0+",
            "about": "The set of replicas we are currently removing." }
      ]}
    ]}
  ]
}

...

kafka-reassign-partitions.sh is a script to create, inspect, or modify partition reassignments.

Deprecation of --zookeeper

We will remove deprecate the --zookeeper option from in this script.  New operations introduced by this KIP (cancellation, list) will not be supported by the ZK flag.

All old operations and new will be supported Instead, all operations will be performed through the admin client.  This will be initialized by the --bootstrap-server flag, which already exists.

The --list flag

There will be a new flag, --list, which will list all of the current replica reassignments that are going on in the cluster.  They will be output in the same JSON format that is accepted by --reassignment-json-fileFor each reassignment, we will list the replica set, the adding replicas, and the removing replicas.  If there are no ongoing replica reassignments, an empty JSON object "No partition reassignments found." will be printed.

Cancellation with the tool

The --additional flag

Like the underlying KIP-455 API, the The reassign-partitions tool will now work in an incremental fashion – any .  Any new reassignment will be added on top of the current ones. It will continue to output a rollback JSON as part of any reassignment in order to let the user revert the changes.

However, to avoid confusing users, the tool will decline to create a new reassignment if there is already one in progress.  Users can choose to create additional reassignments even if there is one in progress by specifying the new --additional flag.  This flag is only supported when --bootstrap-server is provided.

There is also one slight behavior change to --execute that is worth noting here.  Previously, when --execute declined to create a new reassignment because one was already in progress, it would still apply the provided throttle, if any.  In a world of concurrent reassignments, this behavior no longer makes sense, and has been removed.  (However, users can still modify the throttle by simply supplying the --additional flag)

Cancellation with the tool

Additionally, we will support a --create-cancellation-plancancel flag.  This flag will create a JSON file that, if applied, will cancel all on-going partition reassignments.

...