Note this was initially erroneously assigned as KIP-178, which was already taken, and has been reassigned KIP-179.
Status
Current state: Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here (when initially misnumbered as KIP-178) and here (when assigned KIP-179)
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Describe the problems you are trying to solve.
Firstly, the ReassignPartitionsCommand
(which is used by the kafka-reassign-partitions.sh
tool) talks directly to ZooKeeper. This prevents the tool being used in deployments where only the brokers are exposed to clients (i.e. where the zookeeper servers are intentionally not exposed). In addition, there is a general push to refactor/rewrite/replace tools which need ZooKeeper access with equivalents which use the AdminClient
API. Thus it is necessary to change the ReassignPartitionsCommand
so that it no longer talks to ZooKeeper directly, but via an intermediating broker. Similar work is needed for the kafka-topics.sh
tool (which can also change assignments and numbers of partitions and replicas), so common AdminClient
and protocol APIs are desirable.
Secondly, ReassignPartitionsCommand
currently has no proper facility to report progress of a reassignment; --verify
can be used periodically to check whether the request assignments have been achieved. It would be useful if the tool could report progress better.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Binary log format
The network protocol and api behavior
Any class in the public packages under clientsConfiguration, especially client configuration
org/apache/kafka/common/serialization
org/apache/kafka/common
org/apache/kafka/common/errors
org/apache/kafka/clients/producer
org/apache/kafka/clients/consumer (eventually, once stable)
Monitoring
Command line tools and arguments
- Anything else that will likely break existing users in some way when they upgrade
New network protocol APIs will be added:
AlterPartitionCountsRequest
andAlterPartitionCountsResponse
ReassignPartitionsRequest
andReassignPartitionsResponse
AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse
The AdminClient
API will have new methods added (plus overloads for options):
alterPartitionCounts(Map<String, Integer> partitionCounts)
reassignPartitions(Map<TopicPartition, List<Integer>>)
The options accepted by kafka-reassign-partitions.sh
command will change:
--zookeeper
will be deprecated, with a warning message- a new
--bootstrap-server
option will be added - a new
--progress
action option will be added
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Summary of use cases
Use case | command | AdminClient |
---|---|---|
Increase partition count | kafka-topics --alter --topic T --partitions P | alterPartitionCount() |
Change replication factor | kafka-reassign-partitions --execute --reassignment-json-file J |
|
Change partition assignment | kafka-reassign-partitions --execute --reassignment-json-file J | reassignPartitions() |
Change partition assignment with throttle | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
|
Change throttled rate | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
|
Check progress of a reassignment | kafka-reassign-partitions --progress --reassignment-json-file J | (see KIP-113) |
Check result and clear throttle | kafka-reassign-partitions --verify --reassignment-json-file J |
|
kafka-reassign-partitions.sh
and ReassignPartitionsCommand
The --zookeeper
option will be retained and will:
- Cause a deprecation warning to be printed to standard error. The message will say that the
--zookeeper
option will be removed in a future version and that--bootstrap-server
is the replacement option. - Perform the reassignment via ZooKeeper, as currently.
A new --bootstrap-server
option will be added and will:
- Perform the reassignment via the given intermediating broker.
Using both --zookeeper
and --bootstrap-server
in the same command will produce an error message and the tool will exit without doing the intended operation.
It is anticipated that a future version of Kafka would remove support for the --zookeeper
option.
A new --progress
action option will be added. This will only be supported when used with --bootstrap-server
. If used with --zookeeper
the command will produce an error message and the tool will exit without doing the intended operation. --progress
will report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file
option
For example:
# If the following command is used to start a reassignment bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \ --reassignment-json-file expand-cluster-reassignment.json \ --execute # then the following command will print the progress of # that reassignment, then exit immediately bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \ --reassignment-json-file expand-cluster-reassignment.json \ --progress
That might print something like the following:
Topic Partition Broker Status ------------------------------------- my_topic 0 0 In sync my_topic 0 1 Behind: 10456 messages behind asdf 0 1 Unknown topic my_topic 42 1 Unknown partition my_topic 0 42 Unknown broker my_topic 1 0 Broker does not host this partition
The implementation of --progress
will make use of the
method from KIP-113 to find the lag of the syncing replica.describeReplicaLogDirs
()
Internally, the ReassignPartitionsCommand
will be refactored to support the above changes to the options. An interface will abstract the commands currently issued directly to zookeeper.
There will be an implementation which makes the current calls to ZooKeeper, and another implementation which uses the AdminClient
API described below.
In all other respects, the public API of ReassignPartitionsCommand
will not be changed.
AdminClient: alterPartitionCount()
This API supports the use case of changing the partition count via kafka-topics.sh --alter --partitions ...
Notes:
- This API is synchronous in the sense that the client can assume that the partition count has been changed (or the request was rejected) once they have obtained the result for the topic from the
AlterPartitionCountsResult
.
/** * <p>Change the partition count of the topics given as the keys of {@code counts} * according to the corresponding values. Currently it is only possible to increase * the partition count.</p> */ public AlterPartitionCountsResult alterPartitionCounts(Map<String, PartitionCount> counts, AlterPartitionCountsOptions options) public AlterPartitionCountsResult alterPartitionCounts(Map<String, PartitionCount> counts)
Where:
/** Describes a change in a topic's partition count. */ public class PartitionCount { private int partitionCount; private List<List<Integer>> assignments; private PartitionCount(int partitionCount) { ... } /** * Increase the partition count for a topic to the given {@code newCount}. * The assignment of new replicas to brokers will be decided by the broker * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p> */ public static PartitionCount increasePartitionCount(int newCount) { ... } /** * <p>Increase the partition count for a topic to the given {@code newCount} * assigning the new partitions according to the given {@code newAssignments}. * The length of {@code newAssignments} should equal {@code newCount - oldCount}, since * the assignment of existing partitions are not changed. * Each inner list of {@code newAssignments} should have a length equal to * the topic's replication factor. * The first broker id in each inner list is the "preferred replica".</p> * * <p>For example, suppose a topic currently has a replication factor of 2, and * has 3 partitions. The number of partitions can be increased to 4 * (with broker 1 being the preferred replica for the new partition) using a * {@code PartitionCount} constructed like this:</p> * * <pre><code>PartitionCount.increasePartitionCount(4, Arrays.asList(Arrays.asList(1, 2))</code></pre> * */ public static PartitionCount increasePartitionCount(int newCount, List<List<Integer>> newAssignments) { ... } } public class AlterPartitionCountsOptions { public AlterPartitionCountsOptions() { ... } public Integer timeoutMs() { ... } public AlterPartitionCountsOptions timeoutMs(Integer timeoutMs) { ... } public boolean validateOnly() { ... } /** * Validate the request only: Do not actually change any partition counts. */ public AlterPartitionCountsOptions validateOnly() { ... } } public class AlterPartitionCountsResult { // package access constructor Map<String, KafkaFuture<Void>> values() { ... } KafkaFuture<Void> all() { ... } }
Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse
The AlterPartitionCountsRequest
is used to change the partition count for a batch of topics, and is the basis for the AdminClient.alterPartitionCounts()
method.
The request can be sent to any broker.
The request will require the ALTER
operation on the Topic
resource.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
After validating the request the broker calls AdminUtils.addPartitions()
which ultimately updates the topic partition assignment znode (/brokers/topics/${topic}
).
AlterPartitionCountsRequest => [topic_partition_count] timeout topic_partition_count => topic partition_count topic => STRING partition_count => count [assignment] count => INT32 assignment => [INT32] timeout => INT32
Where
Field | Description |
---|---|
topic | the name of a topic |
count | the new partition count |
assignment | a list of assigned brokers (one list for each new partition) |
timeout | The maximum time to await a response in ms. |
The response provides an error code and message for each of the topics present in the request.
AlterPartitionCountsResponse => throttle_time_ms [topic_partition_count_error] topic_partition_count_error => topic error_code error_message topic => STRING error_code => INT16 error_message => NULLABLE_STRING
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the name of a topic in the request |
error_code | an error code for that topic |
error_message | more detailed information about any error for that topic |
Anticipated errors:
TOPIC_AUTHORIZATION_FAILED
(29) The user lacked Alter on the topicPOLICY_VIOLATION(44) The request violated the configured policy
INVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_PARTITIONS
(37) If the partitioncount
was <= the current partition count for the topic.INVALID_REQUEST
(42) If duplicate topics appeared in the request, or the size of thepartitions
list did not equal the number of new partitions, or if the size of any of the lists contained in thepartitions
list was not equal to the topic replication factorNONE
(0) The topic partition count was changed successfully.
AdminClient: reassignPartitions()
This API will support a number of use cases:
- Changing the partition assignment, via
kafka-reassign-partitions.sh
- Changing the replication factor, via
kafka-reassign-partitions.sh
Notes:
- This API is asynchronous in the sense that the client cannot assume that the request is complete (or the request was rejected) once they have obtained the result for the topic from the
ReassignPartitionsResult
. - The
method from KIP-113 can be used to determine progress.describeReplicaLogDirs
() A call to
reassignPartitions()
with thevalidateOnly
option can be used to determine whether a reassignment is currently running, and therefore whether the last reassignment has finished.The API doesn't directly support setting a throttle itself. A prior set of calls to alterInterBrokerThrottle() can be used to set a throttle.
/** * <p>Assign the partitions given as the key of the given <code>assignments</code> to the corresponding * list of brokers. This can be used to change the replica assignment or change the topic's replication factor. * The first broker in each list is the one which holds the "preferred replica".</p> * * <p>To change the replication factor for a topic there must be a key for each partition in * the {@code assignment}s map and the corresponding list of brokers must each be of the same * length which will become the new replication factor.</p> * * <p>If only a subset of the partitions of a particular topic are present in {@code assignments} * the change is taken to be a reassignment of replicas to brokers and each list must have * the same length as the current topic replication factor.</p> * * <h3>Throttling</h3> * <p>Inter-broker reassignment and/or increasing the replication factor causes significant * inter-broker traffic and can take a long time * in order to copy the replica data to brokers. It may be necessary to impose a throttle on * inter-broker traffic for the duration of the reassignment so that client-broker traffic is not * adversely affected.</p> * * <h3>Preferred replica</h3> * <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker * with the preferred replica will be elected leader automatically. * <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this * election when <code>auto.leader.rebalance.enable=false</code>.</p> */ public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments) public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments, ReassignPartitionsOptions options)
Where:
public class ReassignPartitionsOptions { public boolean validateOnly() /** * Validate the request only: Do not actually trigger replica reassignment. */ public ReassignPartitionsOptions validateOnly(boolean validateOnly) public long timeoutMs() /** * Set a timeout for the starting of the reassignment. * Note this timeout does not include the time take to actually * move replicas between brokers. */ public ReassignPartitionsOptions timeoutMs(long timeoutMs) } public class ReassignPartitionsResult { public Map<TopicPartition, KafkaFuture<Void>> values(); public KafkaFuture<Void> all(); }
Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse
A ReassignPartitionsRequest
initiates the movement of replicas between brokers, and is the basis of the AdminClient.reassignPartitions()
method
Notes:
- The request can be sent to any broker.
- The request requires the
Alter
operation on theCLUSTER
resource, since it can require significant inter-broker communication. - The request is subject to the
CreateTopicPolicy
of the broker as configured by the broker'screate.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation. - After validating the request the broker writes reassignment JSON to the
/admin/reassign_partitions
znode
ReassignPartitionsRequest => [topic_reassignments] timeout validate_only topic_reassignments => topic [partition_reassignments] topic => STRING partition_reassignments => partition_id [broker] partition_id => INT32 broker => INT32 timeout => INT32 validate_only => BOOLEAN
Where
Field | Description |
---|---|
topic | the name of a topic |
partition_id | a partition of that topic |
broker | a broker id |
timeout | The maximum time to await a response in ms. |
validate_only | when true: validate the request, but don't actually reassign the partitions |
A ReassignPartitionsResponse
describes which partitions in the request will be moved, and what was wrong with the request for those partitions which will not be moved.
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result] throttle_time_ms => INT32 reassign_partition_result => topic [partition_error] topic => STRING partition_error => partition_id error_code error_message partition_id => INT32 error_code => INT16 error_message => NULLABLE_STRING
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | a topic name from the request |
partition_id | a partition id for that topic, from the request |
error_code | an error code for that topic partition |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failed- POLICY_VIOLATION(44) The request violated the configured policy
INVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existUNKNOWN_MEMBER_ID
(25) If any broker ids in the partition_reassignments included an unknown broker idINVALID_REQUEST
(42) If duplicate topics appeared in the requestPARTITION_REASSIGNMENT_IN_PROGRESS
(new) If the reassignment cannot be started because a reassignment is currently running (i.e. the/admin/reassign_partitions
znode exists)INVALID_REPLICA_ASSIGNMENT
(39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.NONE
(0) reassignment has started
AdminClient: alterInterBrokerThrottle()
This API will support the following use cases:
- A previous call to
reassignPartitions()
has been made and the user wants to apply or change the throttle. - A previous call to
reassignPartitions()
is complete and the user wants to remove the throttle.
Notes:
- The throttle can be queried via describeConfigs()
- The throttle cannot be set via alterConfigs() because: 1) alterConfigs doesn't support changing broker configs and 2) throttles are a special-case
DynamicConfig
. The
throttle is broker-specific and different brokers can have different rates applied.
The following methods will be added to AdminClient
to support changing the inter-broker transfer throttle.
/** * Alter the throttle of inter-broker replication for the given broker to the given rate. * @param broker The broker * @param leaderRateBytesPerSecond The approximate maximum transfer rate, in bytes per second, * to be used for inter-broker replication when the broker is the leader. * @param followerRateBytesPerSecond The approximate maximum transfer rate, in bytes per second, * to be used for inter-broker replication when the broker is the follower. */ AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond) AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond, AlterInterBrokerThrottleOptions options)
Where:
public class AlterInterBrokerThrottleOptions { public Integer timeoutMs() { ... } public AlterInterBrokerThrottleOptions timeoutMs(Integer timeoutMs) { ... } } public class AlterInterBrokerThrottleResult { public KafkaFuture<Void> all(); }
Network Protocol: AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse
An AlterInterBrokerThrottleRequest
changes the leader and follower throttle rates for inter-broker tranfers from/to the broker receiving the request.
AlterInterBrokerThrottleRequest => leader_rate follower_rate timeout leader_rate => INT64 follower_rate => INT64 timeout => INT32
Where:
Field | Description |
---|---|
leader_rate | The approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the leader |
follower_rate | The approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the follower |
timeout | The maximum time to await a response in ms. |
The request requires the Alter
operation on the CLUSTER
resource, since it can affect significant inter-broker communication.
An AlterInterBrokerThrottleResponse
just acknowledges the success or otherwise of a AlterInterBrokerThrottleRequest
AlterInterBrokerThrottleResponse => error_code error_message throttle_time_ms error_code => INT16 error_message => NULLABLE_STRING throttle_time_ms => INT32
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
error_code | an error code for the request |
error_message | more detailed information about any error |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_REQUEST
(42) Leader rate or follower rate was out of range- NONE (0) the request was successful
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Existing users of the kafka-reassign-partitions.sh
will receive a deprecation warning when they use the --zookeeper
option. The option will be removed in a future version of Kafka. If this KIP is introduced in version 1.0.0 the removal could happen in 2.0.0.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.
One alternative is to do nothing: Let the ReassignPartitionsCommand continue to communicate with ZooKeeper directly.
Another alternative is to do exactly this KIP, but without the deprecation of --zookeeper
. That would have a higher long term maintenance burden, and would prevent any future plans to, for example, provide alternative cluster technologies than ZooKeeper.
An alterTopics()
AdminClient API, mirroring the existing createTopics()
API, was considered, but:
- Some calls to
alterTopics()
(such as increasing the partition count) would have been synchronous, while others (such as moving replicas between brokers) would have been long running and thus asynchronous. This made for an API which synchronousness depended on the arguments. createTopics()
allows to specify topic configs, whereasalterConfigs()
is already provided to change topic configs, so it wasn't an exact mirror
Just providing reassignPartitions()
was considered, with changes to partition count inferred from partitions present in the assignments
argument. This would require the caller to provide an assignment of partitions to brokers, whereas currently it's possible to increase the partition count without specifying an assignment. It also suffered from the synchronous/asynchronous API problem.
Similarly a alterReplicationFactors()
method, separate from reassignPartitions()
was considered, but both require a partition to brokers assignment, and both are implemented in the same way (by writing to the /admin/reassign_partitions
znode), so there didn't seem much point in making an API which distinguished them.