Master KIP
KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum (Accepted)
Status
Current state: in reviewaccepted
Discussion thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Release: controller-side changes in 2.4, command line changes in 2.6
Motivation
Currently, users initiate replica reassignment by writing directly to a ZooKeeper node named /admin/reassign_partitions.
As explained in KIP-4, ZooKeeper based APIs have many problems. For example, there is no way to return a helpful error code if an invalid reassignment is partition is proposed. Adding new features over time is difficult when external tools can access and overwrite Kafka's internal metadata data structures. Also, ZooKeeper-based APIs inherently lack security and auditability.
...
Code Block | ||
---|---|---|
| ||
/** * Change the reassignments for one or more partitions. * Providing an empty Optional (e.g via {@link Optional#empty()}) will <bold>cancel</bold> the reassignment for the associated partition. * * @param reassignments The reassignments to add, modify, or remove. * @param options The options to use. * @return The result. */ public AlterPartitionReassignmentsResult alterPartitionReassignments( Map<TopicPartition, Optional<NewPartitionReassignment>> reassignments, AlterPartitionReassignmentsOptions options); /** * A new partition reassignment, which can be applied via {@link AdminClient#alterPartitionReassignments(Map)}. */ public class NewPartitionReassignment { private final List<Integer> targetBrokerstargetReplicas; ... } class AlterPartitionAssignmentsResult { Map<TopicPartition, KafkaFuture<Void>> values() Future<Void> all(); // Throws an exception /** * @throws IllegalArgumentException if no replicas are supplied */ public NewPartitionReassignment(List<Integer> targetReplicas) public List<Integer> targetReplicas() } class AlterPartitionAssignmentsResult { Map<TopicPartition, KafkaFuture<Void>> values() Future<Void> all(); // Throws an exception if any reassignment was rejected } class AlterPartitionAssignmentsOptions extends AbstractOptions<> { // contains timeoutMs } /** * List some of the current partition reassignments. * * @param partitions The partitions to show reassignments for. Must be non-null. * @param options The options to use. */ public ListPartitionReassignmentsResult listPartitionReassignments( Set<TopicPartition> partitions, ListPartitionReassignmentsOptions options); /** * List all of the current partition reassignments. * * @param options The options to use. */ public ListPartitionReassignmentsResult listPartitionReassignments( ListPartitionReassignmentsOptions options); abstract ListPartitionReassignmentsResult listPartitionReassignments(Optional<Set<TopicPartition>> partitions, ListPartitionReassignmentsOptions options) class ListPartitionReassignmentsOptions extends AbstractOptions<> { // contains timeoutMs } class ListPartitionReassignmentsResult { private final KafkaFuture<Map<TopicPartition, PartitionReassignment>> reassignments; } /** * A partition reassignment, which has been listed via {@link AdminClient#listPartitionReassignments()}. */ public class PartitionReassignment { /** * The brokers which this partition currently resides on. */ private final List<Integer> replicas; /** * The brokers that we are adding this partition to as part of a reassignment. */ private final List<Integer> addingReplicas; /** * The brokers that we are removing this partition from as part of a reassignment. */ private final List<Integer> removingReplicas; } |
...
This operation requires ALTER on CLUSTER.
Note that even though we have TimeoutMs, this field will be ignored in the implementation for the time being. Its implementation is tracked in
Jira | ||||||
---|---|---|---|---|---|---|
|
Code Block | ||
---|---|---|
| ||
Code Block | ||
| ||
{ "apiKey": 45, "type": "request", "name": "AlterPartitionReassignmentsRequest", "validVersions": "0", "fields": [ { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000", "about": "The time in ms to wait for the request to complete." }, { "name": "Topics", "type": "[]ReassignableTopic", "versions": "0+", "about": "The topics to reassign.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "about": "The topic name." }, { "name": "Partitions", "type": "[]ReassignablePartition", "versions": "0+", "about": "The partitions to reassign.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "Replicas", "type": "[]int32", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "The replicas to place the partitions on, or null to cancel a pending reassignment for this partition." } ]} ]} ] } |
...
This is the response from AlterPartitionsReassignmentsRequest.
Possible top-level errors:
- REQUEST_TIMED_OUT: if the request timed out.
- NOT_CONTROLLER: if the node we sent the request to was not the controller.
- CLUSTER_AUTHORIZATION_FAILED: If we didn't have sufficient permission to perform the alteration.
Possible partition-level errors:
- INVALID_REPLICA_ASSIGNMENT : if - when the specified replica assignment was is not valid -- for example, if it included negative numbers, repeated numbers, or specified a broker ID that the controller was not aware of.
- UNKNOWN_TOPIC_OR_PARTITION
- (brokers are not part of the cluster, negative replica ids, empty assignment)
- NO_NO_REASSIGNMENT_IN_PROGRESS : If - when a cancellation was called on a topic/partition which was not in the middle of a reassignmentCLUSTER
- UNKNOWN_TOPIC_AUTHORIZATION_FAILED: If we didn't have sufficient permission to perform the alteration.OR_PARTITION - when a topic/partition doesn't exist or a topic deletion is in progress
Code Block | ||
---|---|---|
| ||
{ "apiKey": 45, "type": "response", "name": "AlterPartitionReassignmentsResponse", "validVersions": "0", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code." }, { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "about": "The error message, or null if there was no error." } { "name": "Responses", "type": "[]ReassignableTopicResponse", "versions": "0+", "about": "The responses to topics to reassign.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name" }, { "name": "ErrorCodePartitions", "type": "int16[]ReassignablePartitionResponse", "versions": "0+", "about": "The error code." },responses to partitions to reassign", "fields": [ { " { "name": "ErrorMessagePartitionIndex", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "about": "The error message, or null if there was no error." } partition index." }, { "name": "PartitionsErrorCode", "type": "[]ReassignablePartitionResponseint16", "versions": "0+", "about": "The responseserror to partitions to reassign", "fields": [code." }, { "name": "PartitionIndexErrorMessage", "type": "int32string", "versions": "0+", "about": "The partition index." }"nullableVersions": "0+", { "name": "ErrorCode", "typeabout": "int16", "versions": "0+", The error message, or null if there was no error." } "about": "The error code." },] ] { "name": "ErrorMessage", "} ] } |
ListPartitionReassignmentsRequest
This RPC lists the currently active partition reassignments. It must be sent to the controller.
It requires DESCRIBE on CLUSTER.
Note that even though we have TimeoutMs, this field will be ignored in the implementation for the time being. Its implementation is tracked in
Jira | ||||||
---|---|---|---|---|---|---|
|
Code Block | ||
---|---|---|
| ||
{ "apiKey": 46, "type": "stringrequest", "versionsname": "0+ListPartitionReassignmentsRequest", "nullableVersionsvalidVersions": "0+", "fields": [ { "name": "TimeoutMs", "type": "int32", "aboutversions": "0+"The error message, or null if there was no error." } ] ] } ] } |
ListPartitionReassignmentsRequest
This RPC lists the currently active partition reassignments. It must be sent to the controller.
It requires DESCRIBE on CLUSTER.
Code Block | ||
---|---|---|
| ||
{ "apiKey": 46, , "default": "60000", "about": "The time in ms to wait for the request to complete." } { "name": "Topics", "type": "request[]ListPartitionReassignmentsTopics", "nameversions": "ListPartitionReassignmentsRequest0+", "validVersionsnullableVersions": "0+", "fields": [ { "nameabout": "TimeoutMsThe topics to list partition reassignments for, or null to list everything.", "fields": [ { "name": "Name", "type": "int32string", "versions": "0+", "defaultentityType": "60000topicName", "about": "The time in ms to wait for the request to complete.topic name" }, { "name": "TopicsPartitionIndexes", "type": "[]ListPartitionReassignmentsTopicsint32", "versions": "0+", "nullableVersions": "0+", "about": "The topicspartitions to list partition reassignments for, or null to list everything.", "fields": [ } { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name" }, ]} ] } |
ListPartitionReassignmentsResponse
Possible errors:
- REQUEST_TIMED_OUT: if the request timed out.
- NOT_CONTROLLER: if the node we sent the request to is not the controller.
- CLUSTER_AUTHORIZATION_FAILED: if we didn't have sufficient permissions.
If the top-level error code is set, no responses will be provided.
If a partition is not going through a reassignment, its AddingReplicas and RemovingReplicas fields will simply be empty.
If a partition doesn't exist, no response will be returned for it.
Code Block | ||
---|---|---|
| ||
{ "apiKey": 46, "type": "response", "name": "ListPartitionReassignmentsResponse", "validVersions": "0", "fields": [ { "name": "PartitionIndexesThrottleTimeMs", "type": "[]int32", "versions": "0+", "about": "The partitions duration in milliseconds for which the request was throttled due to list partition reassignments for" } ]} ] } |
ListPartitionReassignmentsResponse
Possible errors:
- REQUEST_TIMED_OUT: if the request timed out.
- NOT_CONTROLLER: if the node we sent the request to is not the controller.
- UNKNOWN_TOPIC_OR_PARTITION
- CLUSTER_AUTHORIZATION_FAILED: if we didn't have sufficient permissions.
If the top-level error code is set, no responses will be provided.
If a partition is not going through a reassignment, its AddingReplicas and RemovingReplicas fields will simply be empty
Code Block | ||
---|---|---|
| ||
{ "apiKey": 46, a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top-level error code, or 0 on success." }, { "name": "ErrorMessage", "type": "responsestring", "nameversions": "ListPartitionReassignmentsResponse0+", "validVersionsnullableVersions": "0+", "fieldsabout": ["The top-level error message, or null on success." }, { "name": "ThrottleTimeMsTopics", "type": "int32[]OngoingTopicReassignment", "versions": "0+", "about": "The durationongoing in millisecondsreassignments for which the request was throttled due to a quota violation, or zero if the request did not violate any quotaeach topic.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "about": "The topic name." }, { "name": "ErrorCodePartitions", "type": "int16[]OngoingPartitionReassignment", "versions": "0+", "about": "The top-level error code, or 0 on success." }, ongoing reassignments for each partition.", "fields": [ { "name": "ErrorMessagePartitionIndex", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "about": "The top-levelindex errorof message, or null on successthe partition." }, { "name": "TopicsReplicas", "type": "[]OngoingTopicReassignmentint32", "versions": "0+", "about": "The ongoingcurrent reassignments for each topicreplica set." }, "fields": [ { "name": "NameAddingReplicas", "type": "string[]int32", "versions": "0+", "about": "The topic name set of replicas we are currently adding." }, { { "name": "PartitionsRemovingReplicas", "type": "[]OngoingPartitionReassignmentint32", "versions": "0+", "about": "The set ongoingof replicas reassignmentswe forare eachcurrently partitionremoving.", "fields": [ } ]} { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The index of the partition." }, { "name": "Replicas", "type": "[]int32", "versions": "0+", "about": "The current replica set." }, ]} ] } |
Modified RPCs
LeaderAndIsrRequest
We will add two new fields to LeaderAndIsrRequest named AddingReplicas and RemovingReplicas. These fields will, respectively, contain the replicas which are being added and removed from a partition's assignment.
These fields allow the controller to propagate reassignment information to the leader/follower brokers, paving the way for some of the improvements outlined in the Future Work section of this KIP. We will not make use of these fields as of this KIP.
Code Block | ||
---|---|---|
| ||
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json index b8988351c..d45727078 100644 --- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json +++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json @@ -20,6 +20,8 @@ // Version 1 adds IsNew. // // Version 2 adds broker epoch and reorganizes the partitions by topic. + // + // Version 3 adds AddingReplicas and RemovingReplicas. "validVersions": "0-3", "fields": [ { "name": "AddingReplicasControllerId", "type": "[]int32", "versions": "0+", @@ -48,6 +50,8 @@ "about": "The set of replicas we are currently addingZooKeeper version." }, { "name": "RemovingReplicasReplicas", "type": "[]int32", "versions": "0+", "about": "The set of replicas we are currently removing." } replica IDs." }, + { "name": "AddingReplicas", "type": "[]int32", "versions": "3+", + ]} "about": "The replica IDs that we are adding this partition to." }, + ]} ] } |
Modified RPCs
LeaderAndIsrRequest
We will add a new field to LeaderAndIsrRequest named TargetReplicas. This field will contain the replicas which will exist once the partition reassignment is complete.
Code Block | ||
---|---|---|
| ||
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json index b8988351c..d45727078 100644 --- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json +++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json @@ -20,6 +20,8 @@ // Version 1 adds IsNew. // // Version 2 adds broker epoch and reorganizes the partitions by topic. + // + // Version 3 adds AddingReplicas and RemovingReplicas. "validVersions": "0-3", "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", @@ -48,6 +50,8 @@ { "name": "RemovingReplicas", "type": "[]int32", "versions": "3+", + "about": "The replica IDs that we are removing this partition from." }, { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true, "about": "TheWhether ZooKeeperthe version." }, { "name": "Replicas", "type": "[]int32", "versions": "0+",replica should have existed on the broker or not." } "about": "The replica IDs." }, + { "name": "AddingReplicas", "type": "[]int32", "versions": "3+", + "about": "The replica IDs that we are adding this partition to." }, + { "name": "RemovingReplicas", "type": "[]int32", "versions": "3+", +]} |
Other Interfaces
NoReassignmentInProgressException
This exception is thrown when a user tries to cancel a reassignment which doesn't exist
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.common.errors; /** * Thrown if a request cannot cancel a reassignment because one is not in progress. */ public class NoReassignmentInProgressException extends ApiException { public NoReassignmentInProgressException(String msg) { super(msg); } public NoReassignmentInProgressException(String msg, Throwable cause) { super(msg, "about": "The replica IDs that we are removing this partition from." }, { "name": "IsNew", "type": "bool", "versions": "1+", "default": "false", "ignorable": true, "about": "Whether the replica should have existed on the broker or not." } ]} |
Other Interfaces
NoReassignmentInProgressException
This exception is thrown when a user tries to cancel a reassignment which doesn't exist
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.common.errors;
/**
* Thrown if a request cannot cancel a reassignment because one is not in progress.
*/
public class NoReassignmentInProgressException extends ApiException {
public NoReassignmentInProgressException(String msg) {
super(msg);
}
public NoReassignmentInProgressException(String msg, Throwable cause) {
super(msg, cause);
}
} |
Proposed Changes
Cancellation
We propose supporting the "cancellation" of an on-going partition rebalance. It is essentially a rollback of a reassignment.
To concretize the notion - a cancellation of a reassignment means stopping any on-going partition movements and restoring the replica set to the one it was prior to the reassignment being initiated.
kafka-reassign-partitions.sh
kafka-reassign-partitions.sh is a script to create, inspect, or modify partition reassignments.
We will remove the --zookeeper option from this script. Instead, all operations will be performed through the admin client. This will be initialized by the --bootstrap-server flag, which already exists.
There will be a new flag, --list, which will list all of the current replica reassignments that are going on in the cluster. They will be output in the same JSON format that is accepted by --reassignment-json-file. If there are no ongoing replica reassignments, an empty JSON object will be printed.
Cancellation with the tool
...
cause);
}
} |
Proposed Changes
Cancellation
We propose supporting the "cancellation" of an on-going partition rebalance. It is essentially a rollback of a reassignment.
To concretize the notion - a cancellation of a reassignment means stopping any on-going partition movements and restoring the replica set to the one it was prior to the reassignment being initiated.
There is no guarantee that the reverted replica set will be in the same order as it was previously.
kafka-reassign-partitions.sh
kafka-reassign-partitions.sh is a script to create, inspect, or modify partition reassignments.
Deprecation of --zookeeper
We will deprecate the --zookeeper option in this script. New operations introduced by this KIP (cancellation, list) will not be supported by the ZK flag.
All old operations and new will be supported through the admin client. This will be initialized by the --bootstrap-server flag, which already exists.
The --list flag
There will be a new flag, --list, which will list all of the current replica reassignments that are going on in the cluster. For each reassignment, we will list the replica set, the adding replicas, and the removing replicas. If there are no ongoing replica reassignments, "No partition reassignments found." will be printed.
The --additional flag
Like the underlying KIP-455 API, the reassign-partitions tool will now work in an incremental fashion. Any new reassignment will be added on top of the current ones.
However, to avoid confusing users, the tool will decline to create a new reassignment if there is already one in progress. Users can choose to create additional reassignments even if there is one in progress by specifying the new --additional flag. This flag is only supported when --bootstrap-server is provided.
There is also one slight behavior change to --execute that is worth noting here. Previously, when --execute declined to create a new reassignment because one was already in progress, it would still apply the provided throttle, if any. In a world of concurrent reassignments, this behavior no longer makes sense, and has been removed. (However, users can still modify the throttle by simply supplying the --additional flag)
Cancellation with the tool
Additionally, we will support a --create-cancellation-plancancel flag. This flag will create a JSON file that, if applied, will cancel all on-going partition reassignments.
...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "version": 12, "partitions": {"0": [1, 4, 2, 3] }, "addingReplicas": {"0": [4] } # <----- NEW "removingReplicas": {"0": [1] }, # <---- NEW } |
Because we are not making incompatible changes to the znode, we will not be bumping up the inter-broker protocol versionThe znode will now be at version 2 rather than 1. This change is backwards-compatible-- older brokers can read the new ZNode.
Controller Changes
When the controller starts up, it will load all of the currently active reassignments from the `topics/[topic]' znodes and the `admin/reassign_partitions` znode, with the latter taking precedence. This will not impose any additional startup overhead, because the controller already reads these znodes during start up.
The reason for having the `reassign_partitions` take precedence is that we cannot know for sure whether the Controller has taken action on that znode, so we allow it to overwrite the API-assigned reassignments in the event of a failover.
...
The controller will handle the AlterPartitionAssignmentsResponse RPC by modifying the /brokers/topics/[topic] znode. Once the node is modified, the controller will send out LeaderAndIsrRequest to start all the movements.Once the controller is notified that a new replica has entered the ISR for a particular partition, it will remove it from the "addingReplicas" field in `RPC by modifying the /brokers/topics/[topic] `. If "addingReplicas" becomes emptyznode. Once the node is modified, the controller will send a new wave of LeaderAndIsr requests to retire all the old replicas and, if applicable, elect new leadersout LeaderAndIsrRequest to start all the movements.
Algorithm
We will always add all the replicas in the "addingReplicas" field before starting to act on the "removingReplicas" field.
Code Block |
---|
# Illustrating the rebalance of a single partition.
# R is the current replicas, I is the ISR list, AR is addingReplicas and RR is removingReplicas
R: [1, 2, 3], I: [1, 2, 3], AR: [], RR: []
# Reassignment called via API with targetReplicas=[4,3,2]
R: [1, 4, 3, 2], I: [1, 2, 3], AR: [4], RR: [1] # Controller sends LeaderAndIsr requests
# (We ignore RR until AR is empty)
R: [1, 4, 3, 2], I: [1, 2, 3, 4], AR: [4], RR: [1] # Leader 1 adds 4 to ISR
# The controller realizes that all the replicas in AR have been added and starts work on RR. Removes all of RR from R and from the ISR, and sends StopReplica/LeaderAndIsr requests
R: [4, 3, 2], I: [2, 3, 4], AR: [], RR: [] # at this point the reassignment is considered complete |
When all of the replicas in AR have been added, we remove RR from R and empty out AR/RR in one step.
If a new reassignment is issued during an on-going one, we cancel the current one by emptying out both AR and RR, constructing them send StopReplica requests to the replicas that were part of the old reassignment but not part of the new one, construct AR/RR from (the updated from the last-reassignment) R and TR, and starting start anew.
In general this algorithm is consistent with the current Kafka behavior - other brokers still get the full replica set consisting of both the original replicas and the new ones.
...
Essentially, once a cancellation is called we subtract AR from R, empty out both AR and RR, and send StopReplica/LeaderAndIsr requests to cancel the replica movements that have not yet completed.
...