...
Table of Contents |
---|
Status
Current state: Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]Withdrawn
Discussion thread: here (when initially misnumbered as KIP-178) and here (when assigned KIP-179)
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).e
Motivation
Describe the problems you are trying to solve.
...
- The AdminClient API currently lacks any functionality for reassigning partitions: Users have to use the
kafka-reassign-partitions.sh
tool (ReassignPartitionsCommand
) which talks directly to ZooKeeper. This prevents the tool being used in deployments where only the brokers are exposed to clients (i.e. where the zookeeper servers are intentionally not exposed). In addition, there is a general push to refactor/rewrite/replace tools which need ZooKeeper access with equivalents which use theAdminClient
API.
...
ReassignPartitionsCommand
currently has no proper facility to report progress of a reassignment; While
--verify
can be used periodically to check whether the request assignments have been achieved the tool provides no means of knowing how quickly new replicas are catching up. It would be useful if the tool could report progress better.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
...
Binary log format
...
The network protocol and api behavior
...
Any class in the public packages under clientsConfiguration, especially client configuration
org/apache/kafka/common/serialization
org/apache/kafka/common
org/apache/kafka/common/errors
org/apache/kafka/clients/producer
org/apache/kafka/clients/consumer (eventually, once stable)
...
Monitoring
...
Command line tools and arguments
ReassignPartitionsCommand
, when used with a replication throttle, requires a--verify
invocation when the reassignment has finished in order to remove the throttle. So there exists the possibility that throttles are not removed after reassignment, with negative consequences for the performance of the cluster. It would be better if throttles could be removed automatically.
Public Interfaces
The AdminClient
API will have new methods added (plus overloads for options):
alterInterbrokerThrottledRates(Map<Integer, ThrottledRate> throttledRates)
alterInterbrokerThrottledReplicas(Map<TopicPartition, ThrottledReplicas> replicas)
The options for reassignPartitions()
will support setting a throttle, and a flag for its automatic removal at the end of the reassignment. Likewise the options for changing the throttled rates and replicas will include the ability to have the throttles automatically removed.
...
New network protocol APIs will be added :
AlterPartitionCountsRequest
andAlterPartitionCountsResponse
AlterReplicationFactorsRequest
andAlterReplicationFactorsResponse
ReassignPartitionsRequest
andReassignPartitionsResponse
AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse
The AdminClient
API will have new methods added (plus overloads for options):
alterPartitionCounts(Map<String, Integer> partitionCounts)
alterReplicationFactors(Map<String, Short> replicationFactors)
reassignPartitions(Map<TopicPartition, List<Integer>>)
The options accepted by kafka-reassign-partitions.sh
command will change:
to support these AdminClient APIs
AlterInterbrokerThrottledRatesRequest
andAlterInterbrokerThrottledRatesResponse
AlterInterbrokerThrottledReplicasRequest
andAlterInterbrokerThrottledReplicasResponse
The options accepted by kafka-reassign-partitions.sh
command will change:
----zookeeper
will be deprecated, with a warning message- a new
--bootstrap-server
option will be added - a new
--progress
action option will be added
Proposed Changes
...
When run with --bootstrap-server
it will no longer be necessary to run kafka-reassign-partitions.sh --verify
to remove a throttle: This will be done automatically.
Proposed Changes
Summary of use cases
Use case | command | AdminClient |
---|---|---|
Change replication factor | kafka-reassign-partitions |
...
--execute --reassignment-json-file J |
| |
Change partition assignment | kafka-reassign-partitions --execute --reassignment-json-file J | reassignPartitions() |
Change partition assignment with throttle | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
|
Change throttled rate | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R | alterInterbrokerThrottledRates() alterInterbrokerThrottledReplicas()
|
Check progress of a reassignment | kafka-reassign-partitions --progress --reassignment-json-file J | (see KIP-113) |
Check result and clear throttle | kafka-reassign-partitions --verify --reassignment-json-file J |
// TODO checks none in progress, doesn't confirm states match
|
kafka-reassign-partitions.sh
and ReassignPartitionsCommand
The --zookeeper
option will be retained and will:
- Cause a deprecation warning to be printed to standard error. The message will say that the
--zookeeper
option will be removed in a future version and that--bootstrap-server
is the replacement option. - Perform the reassignment via ZooKeeper, as currently.
A new --bootstrap-server
option will be added and will:
- Perform the reassignment via the AdminClient API (described below) using the given broker(s) as bootstrap brokers.
- When used with
--execute
and--throttle
, the throttle will be an auto-removed one.
Using both --zookeeper
and --bootstrap-server
in the same command will produce an error message and the tool will exit without doing the intended operation.
It is anticipated that a future version of Kafka would remove support for the --zookeeper
option.
A new --progress
action option will be added. This will only be supported when used with --bootstrap-server
. If used with --zookeeper
the command will produce an error message and the tool will exit without doing the intended operation. --progress
will report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file
option
For example:
No Format |
---|
# If the following command is used to start a reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
--reassignment-json-file expand-cluster-reassignment.json \
--execute
# then the following command will print the progress of
# that reassignment, then exit immediately
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
--reassignment-json-file expand-cluster-reassignment.json \
--progress |
That might print something like the following:
No Format |
---|
Topic Partition Broker Status
-------------------------------------
my_topic 0 |
The --zookeeper
option will be retained and will:
- Cause a deprecation warning to be printed to standard error. The message will say that the
--zookeeper
option will be removed in a future version and that--bootstrap-server
is the replacement option. - Perform the reassignment via ZooKeeper, as currently.
A new --bootstrap-server
option will be added and will:
- Perform the reassignment via the given intermediating broker.
Using both --zookeeper
and --bootstrap-server
in the same command will produce an error message and the tool will exit without doing the intended operation.
It is anticipated that a future version of Kafka would remove support for the --zookeeper
option.
A new --progress
action option will be added. This will only be supported when used with --bootstrap-server
. If used with --zookeeper
the command will produce an error message and the tool will exit without doing the intended operation. --progress
will report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file
option
For example:
No Format |
---|
# If the following command is used to start a reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
--reassignment-json-file expand-cluster-reassignment.json \
--execute
# then the following command will print the progress of
# that reassignment, then exit immediately
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
--reassignment-json-file expand-cluster-reassignment.json \
--progress |
That might print something like the following:
No Format |
---|
Topic Partition Broker Status ------------------------------------- my_topic 0 0 In sync my_topic 0 1 Behind: 10456 messages behind asdf 0 1 Unknown topic my_topic 42 10 UnknownIn partitionsync my_topic 0 421 Behind: 10456 messages behind asdf 0 1 Unknown brokertopic my_topic 42 1 Unknown partition my_topic 0 42 Broker does not host this partitionUnknown broker my_topic 1 0 Broker does not host this partition |
The implementation of --progress
The implementation of --progress
will make use of the describeReplicaDirthe
method from KIP-113 to find the lag of the syncing replica.describeReplicaLogDirs
()
...
In all other respects, the public API of ReassignPartitionsCommand
will not be changed.
AdminClient:
...
reassignPartitions()
Anchor | alterPartitionCount | alterPartitionCount |
---|
kafka-topics.sh --alter --partitions ...
the following methods will be added to AdminClient
to support changing topics' partition counts
|
- This API is asynchronous in the sense that the client cannot assume that the request is complete (or the request was rejected) once they have obtained the result for the topic from the
ReassignPartitionsResult
. - The
method from KIP-113 can be used to determine progress.describeReplicaLogDirs
() A call to
reassignPartitions()
with thevalidateOnly
option can be used to determine whether a reassignment is currently running, and therefore whether the last reassignment has finished.
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* <p>Reassign the partitions given as the key of the given <code>assignments</code> to the corresponding
* list of brokers. The first broker in each list is the one which holds the "preferred replica" | ||||
Code Block | ||||
| ||||
/** * <p>Change the partition count of the topics given as the keys of {@code counts} * to the corresponding values. Currently it is only possible to increase * the partition count.</p> * * <p>The replicas of new partitions will be allocated to the least loaded broker, * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p> */ public AlterPartitionCountsResult alterPartitionCounts(Map<String, Integer> counts, AlterPartitionCountsOptions options) public AlterPartitionCountsResult alterPartitionCounts(Map<String, Integer> counts) |
Where:
Code Block | ||||
---|---|---|---|---|
| ||||
public class AlterPartitionCountsOptions {
public AlterPartitionCountsOptions() { ... }
public Integer timeoutMs() { ... }
public AlterPartitionCountsOptions timeoutMs(Integer timeoutMs) { ... }
}
public class AlterPartitionCountsResult {
// package access constructor
Map<String, KafkaFuture<Void>> values() { ... }
KafkaFuture<Void> all() { ... }
} |
AdminClient: alterReplicationFactors()
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * <p>Change the replication factor of the topics given as the keys of * replicationFactors to the corresponding values.</p> * * <p>New replicas will be allocated to the least loaded broker, * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p> */ AlterReplicationFactorsResult alterReplicationFactors(Map<String, Short> replicationFactors) AlterReplicationFactorsResult alterReplicationFactors(Map<String, Short> replicationFactors, * <p>Inter-broker reassignment causes significant inter-broker traffic and can take a long time * in order to copy the replica data to brokers. The given options can be used impose a quota on * inter-broker traffic for the duration of the reassignment so that client-broker traffic is not * adversely affected.</p> * * <h3>Preferred replica</h3> * <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker * with the preferred replica will be elected leader automatically. * <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this * election when <code>auto.leader.rebalance.enable=false</code>.</p> * * @param assignments The partition assignments. * @param options The options to use when reassigning the partitions * @return The ReassignPartitionsResult */ public abstract ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments, AlterReplicationFactorsOptions ReassignPartitionsOptions options); |
Where:
Code Block | ||||
---|---|---|---|---|
| ||||
public class ReassignPartitionsOptions AlterReplicationFactorsOptionsextends AbstractOptions<ReassignPartitionsOptions> { public AlterReplicationFactorsOptions// Note timeoutMs() {inherited ... }from AbstractOptions public publicboolean Integer timeoutMsvalidateOnly() { ... } /** public AlterReplicationFactorsOptions* timeoutMs(Integer timeoutMs) { ... } } public class AlterReplicationFactorsResult { Validate the request only: Do not actually trigger replica reassignment. *// package access constructor public ReassignPartitionsOptions validateOnly(boolean validateOnly) Map<String, public KafkaFuture<Void>>long valuesthrottle() { ... } return KafkaFuture<Void> all() { ... } } |
AdminClient: reassignPartitions()
...
Code Block | ||||
---|---|---|---|---|
| ||||
throttle; } /** * <p>Reassign the partitions given* as<p>Set the keythrottle ofrate theand giventhrottled <code>assignments</code>replicas tofor the correspondingreassignments. * list of brokers.* The firstgiven throttle brokeris in eachbytes/second listand isshould thebe oneat whichleast holds the "preferred replica".</p> * * <p>Inter-broker reassignment causes significant inter-broker traffic and can take a long time * in order to copy the replica data to brokers. It may be necessary to impose a quota on * inter-broker traffic for the duration of the reassignment so that client-broker traffic is not * adversely affected.</p> * * <h3>Preferred replica</h3> * <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker * with the preferred replica will be elected leader automatically. * <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this * election when <code>auto.leader.rebalance.enable=false</code>.</p> */ public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments) public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments, ReassignPartitionsOptions options) |
Where:
Code Block | ||||
---|---|---|---|---|
| ||||
public class ReassignPartitionsOptions { public boolean validateOnly() /**1 KB/s. * Interbroker replication traffic will be throttled to approximately the given value. * Use Long.MAX_VALUE if the reassignment should not be throttled.</p> * * <p>A positive throttle is equivalent to setting:</p> * <ul> * <li>The leader and follower throttled rates to the given value given by throttle.</li> * <li>The leader throttled replicas of each topic in the request to include the existing brokers having * replicas of the partitions in the request.</li> * <li>The follower throttled replicas of each topic in the request to include the new brokers * Validate the request only: Dofor noteach actuallypartition triggerin replicathat reassignmenttopic.</li> * </ul> public ReassignPartitionsOptions validateOnly(boolean validateOnly)* public long timeoutMs() /** * <p>The value of {@link #autoRemoveThrottle()} will determine whether these * Setthrottles awill timeoutbe forremoved theautomatically startingwhen of the reassignment completes. </p> * Note this timeout does not* include the time take to actually@see AdminClient#alterInterbrokerThrottledRate(int, long, long) * move replicas between brokers.@see AdminClient#alterInterbrokerThrottledReplicas(Map) */ public public ReassignPartitionsOptions timeoutMsthrottle(long timeoutMs)throttle) { ... } public longboolean throttleautoRemoveThrottle() { ... } /** * SetTrue ato throttle,automatically inremove bytesthe perthrottle second,at on the bandwidthend usedof forthe * inter-broker replica movementcurrent reassignment. */ public ReassignPartitionsOptions throttleautoRemoveThrottle(longboolean throttledRateBytesPerSecond) } public class autoRemoveThrottle) { ... } } public class ReassignPartitionsResult { public Map<TopicPartition, KafkaFuture<Void>> values(); public KafkaFuture<Void> all(); } |
Partition reassignment is a long running operation, and the ReassignPartitionsResult
indicates only that the reassignment has been started, not that the reassignment has been completed. The describeReplicaDir()
method from KIP-113 can be used to determine progress.
AdminClient: alterInterBrokerThrottle()
...
Code Block |
---|
/**
* Alter the throttle of inter-broker replication for the given broker to the given rate.
* @param broker The broker
* @param leaderRateBytesPerSecond The approximate maximum transfer rate, in bytes per second,
* to be used for inter-broker replication when the broker is the leader.
* @param followerRateBytesPerSecond The approximate maximum transfer rate, in bytes per second,
* to be used for inter-broker replication when the broker is the follower.
*/
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond)
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond, AlterInterBrokerThrottleOptions options) |
Where:
Code Block |
---|
public class AlterInterBrokerThrottleOptions {
public Integer timeoutMs() { ... }
public AlterInterBrokerThrottleOptions timeoutMs(Integer timeoutMs) { ... }
}
public class AlterInterBrokerThrottleResult {
public KafkaFuture<Void> all();
} |
Notes:
- The throttled rate is implemented as a DynamicConfig on the broker, but this is an implementation detail so it wouldn't be appropriate to set it via
alterConfigs()
. In any case it's currently not possible to change broker configs viaalterConfigs()
. The
throttle is broker-specific and different brokers can have different rates applied.
Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse
...
Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse
Anchor | ||||
---|---|---|---|---|
|
ReassignPartitionsRequest
initiates the movement of replicas between brokers, and is the basis of the AdminClient.reassignPartitions()
methodNotes:
- The request must be sent to the controller.
- The request requires the
Alter
operation on theCLUSTER
resource, since it can require significant inter-broker communication. - The request will be subject to a policy, as described in KIP-201.
No Format |
---|
ReassignPartitionsRequest => [topic_reassignments] timeout validate_only throttle remove_throttle
topic_reassignments => topic [partition_reassignments]
topic => STRING
partition_reassignments => partition_id [broker]
partition_id => INT32
broker => INT32
timeout => INT32
validate_only => BOOLEAN
throttle => INT64
remove_throttle => BOOLEAN |
Where
Field | Description |
---|---|
topic | the name of a topic |
partition_id | a partition of that topic |
broker | a broker id |
timeout | The maximum time to await a response in ms. |
validate_only | when true: validate the request, but don't actually reassign the partitions |
Algorithm:
- The controller validates the request against configured authz, policy etc.
- The controller computes set of topics in the request, and writes this as JSON to the new
/admin/throttled_replicas_removal
znode - The controller then updates the existing
leader.replication.throttled.replicas
andfollower.replication.throttled.replicas
properties of each topic config. - The controller computes the union of 1) the brokers currently hosting replicas of the topic partitions in the request 2) the brokers assigned to host topic partitions in the request, and write this as JSON to the new
/admin/throttled_rates_removal
znode. - The controller then updates the existing
leader.replication.throttled.rates
andfollower.replication.throttled.rates properties
of each broker config. - The controller writes reassignment JSON to the
/admin/reassign_partitions
znode
The intent behind this algorithm is that should the controller crash during the update, the reassignment won't have started and the throttles will be removed on controller failover.
The broker will use the same algorithm for determing the values of the topic and broker configs as is currently used in the ReassignPartitionsCommand
.
Anchor | ||||
---|---|---|---|---|
|
ReassignPartitionsResponse
describes which partitions in the request will be moved, and what was wrong with the request for those partitions which will not be moved.No Format |
---|
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
throttle_time_ms => INT32
reassign_partition_result => topic [partition_error] |
No Format |
AlterPartitionCountsRequest => [topic_partition_count] timeout topic_partition_count => topic partition_count topic => STRING partition_counterror => INT32 timeout partition_id error_code error_message partition_id => INT32 |
Where
Field | Description |
---|---|
topic | the name of a topic |
partition_count | the new partition count |
timeout | The maximum time to await a response in ms. |
The request will require the ALTER
operation on the Topic
resource.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
...
No Format |
---|
AlterPartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
topic_partition_count_error => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the name of a topic in the request |
error_code | an error code for that topic |
error_message | more detailed information about any error for that topic |
Anticipated errors:
TOPIC_AUTHORIZATION_FAILED
(29) The user lacked Alter on the topicINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_PARTITIONS
(37) If the num_partitions was invalidINVALID_REQUEST
(42) If duplicate topics appeared in the request.NONE
(0) The topic partition count was changed successfully.
Network Protocol: AlterReplicationFactorsRequest and AlterReplicationFactorsResponse
...
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | a topic name from the request |
partition_id | a partition id for that topic, from the request |
error_code | an error code for that topic partition |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failed- POLICY_VIOLATION(44) The request violated the configured policy
INVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existUNKNOWN_MEMBER_ID
(25) If any broker ids in the partition_reassignments included an unknown broker idINVALID_REQUEST
(42) If duplicate topics appeared in the requestPARTITION_REASSIGNMENT_IN_PROGRESS
(new) If the reassignment cannot be started because a reassignment is currently running (i.e. the/admin/reassign_partitions
znode exists)- INVALID_THROTTLE (new) If the given throttle is <=0.
INVALID_REPLICA_ASSIGNMENT
(39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.NONE
(0) reassignment has started
AdminClient: alterInterbrokerThrottledRates()
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Change the rate at which interbroker replication is throttled, replacing existing throttled rates.
* For each broker in the given {@code rates}, the {@code leaderRate} of the corresponding
* {@code ThrottledRate} is the throttled rate when the broker is acting as leader and
* the {@code followerRate} is the throttled rate when the broker is acting as follower.
* For the throttled rates to take effect, the given broker must also be present in the
* list of throttled replicas, which can be set by {@link #alterInterbrokerThrottledReplicas()}.
* The throttle will be automatically removed at the end of the current reassignment,
* unless overridden in the given options.
*
* The current rates can be obtained from {@link #describeConfigs(Collection)}.
*
* @param rates Map from broker id to the throttled rates for that broker.
* @param options The options.
*/
public abstract AlterInterbrokerThrottledRateResult alterInterbrokerThrottledRate(
Map<Integer, ThrottledRate> rates,
AlterInterbrokerThrottledRateOptions options);
|
Where:
Code Block |
---|
/**
* The throttled rate for interbroker replication on a particular broker.
*/
public class ThrottledRate {
public ThrottledRate(long leaderRate, long followerRate) { ... }
/**
* The throttled rate when the broker is acting as leader.
*/
long leaderRate() { ... }
/**
* The throttled rate when the broker is acting as follower.
*/
long followerRate() { ... }
}
public class AlterInterbrokerThrottledRateOptions extends AbstractOptions<AlterInterbrokerThrottledRateOptions> {
public boolean autoRemoveThrottle() { ... }
/**
* True to automatically remove the throttle at the end of the current reassignment.
*/
public AlterInterbrokerThrottledRateOptions autoRemoveThrottle(boolean autoRemoveThrottle) { ... }
}
public class AlterInterbrokerThrottledRateResult {
// package-access ctor
public Map<Integer, KafkaFuture<Void>> values() { ... }
public KafkaFuture<Void> all() { ... }
} |
Network API: AlterInterbrokerThrottledRatesRequest and AlterInterbrokerThrottledRatesResponse
No Format |
---|
AlterInterbrokerThrottledRatesRequest => [broker_throttles] remove_throttle timeout validate_only
broker_throttles => broker_id leader_rate follower_rate
broker_id => INT32
leader_rate => INT64
follower_rate => INT64
timeout => INT32
validate_only => BOOLEAN
remove_throttle => BOOLEAN |
Algorithm:
- The controller validates the brokers and rates in the request and that the principal has
Alter
operation on theCLUSTER
resource. - The controller gets the current value of the /admin/throttled_rates_removal znode, forms the union of those brokers with those in the request and updates the /admin/throttled_rates_removal znode with JSON representation of this union
- The controller then subtracts the brokers in the request from the current brokers and removes the leader.replication.throttled.rates and follower.replication.throttled.rates properties from each broker config
- The controller then, for each broker in the request, adds the leader.replication.throttled.rates and follower.replication.throttled.rates properties to each broker config.
- The controller then updates /admin/throttled_rates_removal znode with JSON representation of brokers in the request.
The intent behind this algorithm is that should the controller crash during the update, throttles will still be removed on completion of reassignment.
No Format |
---|
AlterInterbrokerThrottledRatesResponse => [broker_error]
broker_error => broker_id error_code error_messgae
broker_id => INT32
|
No Format |
---|
AlterReplicationFactorsRequest => [topic_replication_factor] timeout
topic_replication_factor => topic replication_factor
topic => STRING
replication_factor => INT16
timeout => INT32
// TODO: validate_only? |
Where
Field | Description |
---|---|
topic | topic name |
replication_factor | the new replication factor for this topic |
timeout | The maximum time to await a response in ms. |
The request will require the ClusterAction
operation on the CLUSTER
resource, since it can require significant inter-broker communication.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
...
No Format |
---|
AlterReplicationFactorResponse => throttle_time_ms [topic_replication_factor_error]
topic_replication_factor_error => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the name of a topic in the request |
error_code | an error code for that topic |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_REPLICATION_FACTOR
(38) If the replication_factor was invalidINVALID_REQUEST
(42) If duplicate topics appeared in the request.NONE
(0) The topic replication factor was changed successfully.
Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse
...
No Format |
---|
ReassignPartitionsRequest => [topic_reassignments] timeout validate_only
topic_reassignments => topic [partition_reassignments]
topic => STRING
partition_reassignments => partition_id [broker]
partition_id => INT32
broker => INT32
timeout => INT32
validate_only => BOOLEAN |
Where
Field | Description |
---|---|
topic | the name of a topic |
partition_id | a partition of that topic |
broker | a broker id |
timeout | The maximum time to await a response in ms. |
validate_only | when true: validate the request, but don't actually reassign the partitions |
The request requires the ClusterAction
operation on the CLUSTER
resource, since it can require significant inter-broker communication.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
...
No Format |
---|
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
throttle_time_ms => INT32
reassign_partition_result => topic partition_id error_code error_message
topic => STRING
partition_id => INT32
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | a topic name from the request |
partition_id | a partition id for that topic, from the request |
error_code | an error code for that topic partition |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existUNKNOWN_MEMBER_ID
(25) If any broker ids in the partition_assignment included an unknown broker idINVALID_REQUEST
(42) If duplicate topics appeared in the requestPARTITION_REASSIGNMENT_IN_PROGRESS
(new)INVALID_REPLICA_ASSIGNMENT
(39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.NONE
(0) reassignment has started
Network Protocol: AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse
...
No Format |
---|
AlterInterBrokerThrottleRequest => leader_rate follower_rate timeout
leader_rate => INT64
follower_rate => INT64
timeout => INT32 |
Where:
Field | Description |
---|---|
leader_rate | The approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the leader |
follower_rate | The approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the follower |
timeout | The maximum time to await a response in ms. |
The request requires the ClusterAction
operation on the CLUSTER
resource, since it can affect significant inter-broker communication.
...
No Format |
---|
AlterInterBrokerThrottleResponse => error_code error_message throttle_time_ms error_code => INT16 error_message => NULLABLE_STRING throttle_time_ms => INT32 |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
error_code | an error code for the request |
error_message | more detailed information about any error |
...
Anticipated Errors:
- NOT_CONTROLLER (41) if the request was sent to a broker that wasn't the controller.
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failed- INVALID_REQUESTTHROTTLE (42new) Leader rate or follower rate was out of range
- NONE (0) the request was successful
Summary of use cases
...
- if the throttled rate is <= 0.
UNKNOWN_MEMBER_ID(25) if the broker id in the request is not a broker in the cluster
AdminClient: alterInterbrokerThrottledReplicas()
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Set the partitions and brokers subject to the
* {@linkplain #alterInterbrokerThrottledRate(Map)
* interbroker throttled rate}.
* The brokers specified as the {@link ThrottledReplicas#leaders()} corresponding to a
* topic partition given in {@code replicas} will be subject to the leader throttled rate
* when acting as the leader for that partition.
* The brokers specified as the {@link ThrottledReplicas#followers()} corresponding to a
* topic partition given in {@code repicas} will be subject to the follower throttled rate
* when acting as the follower for that partition.
*
* The throttle will be automatically removed at the end of the current reassignment,
* unless overridden in the given options.
*
* The current throttled replicas can be obtained via {@link #describeConfigs(Collection)} with a
* ConfigResource with type {@link ConfigResource.Type#TOPIC TOPIC} and name "leader.replication.throttled.replicas"
* or "follower.replication.throttled.replicas".
*/
public abstract AlterInterbrokerThrottledReplicasResult alterInterbrokerThrottledReplicas(
Map<TopicPartition, ThrottledReplicas> replicas,
AlterInterbrokerThrottledReplicasOptions options);
|
Where:
Code Block |
---|
public class ThrottledReplicas {
public ThrottledReplicas(Collection<Integer> leaders, Collection<Integer> followers) { ... }
/**
* The brokers which should be throttled when acting as leader. A null value indicates all brokers in the cluster.
*/
public Collection<Integer> leaders() { .. }
/**
* The brokers which should be throttled when acting as follower. A null value indicates all brokers in the cluster.
*/
public Collection<Integer> followers() { ... }
}
public class AlterInterbrokerThrottledReplicasOptions extends AbstractOptions<AlterInterbrokerThrottledReplicasOptions> {
public boolean autoRemoveThrottle() { ... }
/**
* True to automatically remove the throttle at the end of the current reassignment.
*/
public AlterInterbrokerThrottledReplicasOptions autoRemoveThrottle(boolean autoRemoveThrottle) { ... }
}
public class AlterInterbrokerThrottledReplicasResult {
// package-access ctor
public Map<TopicPartition, KafkaFuture<Void>> values() { ... }
public KafkaFuture<Void> all() { ... }
} |
Network API: AlterInterbrokerThrottledReplicasRequest and AlterInterbrokerThrottledReplicasResponse
No Format |
---|
AlterInterbrokerThrottledRatesRequest => [topic_throttles] remove_throttle timeout validate_only
topic_throttles => topic [partition_throttles]
topic => STRING
partition_throttles => partition_id [broker_id]
partition_id => INT32
broker_id => INT32
timeout => INT32
validate_only => BOOLEAN
remove_throttle => BOOLEAN |
Algorithm:
- The controller validates the partitions and brokers in the request and that the principal has
Alter
operation on theCLUSTER
resource.
- The controller gets the current value of the
/admin/throttled_replicas_removal
znode, forms the union of those topics with those in the request and updates the/admin/throttled_rates_removal
znode with JSON representation of this union - The controller then subtracts the topics in the request from the current topics and removes the
leader.replication.throttled.replicas
andfollower.replication.throttled.replicas
properties from each topic config The controller then, for each topic in the request, adds the
leader.replication.throttled.rates
andfollower.replication.throttled.rates
properties to each topic config.- The controller then updates
/admin/throttled_replicas_removal
znode with JSON representation of topics in the request.
The intent behind this algorithm is that should the controller crash during the update, throttles will still be removed on completion of reassignment.
No Format |
---|
AlterInterbrokerThrottledReplicasResponse => [topic_errors]
topic_errors => topic [partition_errors]
topic => STRING
partition_errors => partition_id error_code error_messgae
partition_id => INT32
error_code => INT16
error_message => NULLABLE_STRING |
Anticipated errors:
- NOT_CONTROLLER (41) if the request was sent to a broker that wasn't the controller.
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failed- UNKNOWN_TOPIC_OR_PARTITION (3) if a partition in the request is not known in the cluster.
UNKNOWN_MEMBER_ID(25) if the broker id in the request is not a broker in the cluster
On Controller Failover
The algorithms presented above, using the new znodes, are constructed so that should the controller fail, on election of a new controller ZooKeeper is not left in an inconsistent state where throttles which should be removed automatically are not removed at the end of the reassignment. The recovery algorithm is as follows:
- If the
/admin/reassign_partitions
znode exists we assume a reassignment is on-going and do nothing. - Otherwise, if the
/admin/reassign_partitions
znode does not exists, we proceed to remove the throttles, as detailed in "Throttle removal" section below.
On reassignment completion
When reassignment is complete:
- The
/admin/reassign_partitions
znode gets removed. - We remove the throttles, as detailed in "Throttle removal" section below.
Throttle removal
The algorithm for removing the throttled replicas is:
- If
/admin/remove_throttled_replicas
is set:For each of the topics listed in that znode:
Remove the
(
...
reassignPartitions(validateOnly)
// check none in progress
alterInterBrokerThrottle()
...
leader|follower).replication.throttled.replicas
...
reassignPartitions()
...
alterInterBrokerThrottle()
...
reassignPartitions(validateOnly)
// check none in progress
alterConfigs()
// (leader|follower).replication.throttled.replicas
properties for that topic config.
- Remove the
/admin/remove_throttled_replicas
znode
The symmetric algorithm is used for /admin/remove_throttled_rates
, only with broker configs.
Compatibility, Deprecation, and Migration Plan
Existing users of the kafka-reassign-partitions.sh
will receive a deprecation warning when they use the --zookeeper
option. The option will be removed in a future version of Kafka. If this KIP is introduced in version 1.0.0 the removal could happen in 2.0.0.
Implementing AdminClient.alterConfigs()
for (dynamic) broker configs was considered as a way of implementing throttle management but this would not support the auto removal feature.
Not supporting passing a throttle in the AdminClient.reassignPartitions() (and just using the APIs for altering throttles) was considered, but:
- Being able to specify a throttle at the same time at starting the reassignment is very convenient.
- Race conditions are possible if the APIs requires throttles set up before reassignment starts. What if reassignPartitions() doesn't get called, or none of the partitions in the call can be be reassigned?
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
...
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.
...
- Some calls to
alterTopics()
(such as increasing the partition count) would have been synchronous, while others (such as moving replicas between brokers) would have been long running and thus asynchronous. This made for an API which synchronousness depended on the arguments. createTopics()
allows to specify topic configs, whereasalterConfigs()
is already provided to change topic configs, so it wasn't an exact mirrormirror
Just providing reassignPartitions()
was considered, with changes to partition count inferred from partitions present in the assignments
argument. This would require the caller to provide an assignment of partitions to brokers, whereas currently it's possible to increase the partition count without specifying an assignment. It also suffered from the synchronous/asynchronous API problem.
Similarly a alterReplicationFactors()
method, separate from Just providing reassignPartitions()
was considered, with changes to partition count and replication factor inferred from partitions and brokers present in the assignments
argument. This would require the caller to provide an assignment of partitions to brokers, but the AdminClient doesn't provide methods to obtain the necessary information to make an informed decision (what it the load on the brokers, how much free disk space do they have, or how many partitions are they already hosting), so it was felt better to retain alterPartitionCounts()
and alterReplicationFactors()
for the time beingbut both require a partition to brokers assignment, and both are implemented in the same way (by writing to the /admin/reassign_partitions
znode), so there didn't seem much point in making an API which distinguished them.
Algorithms making use of the ZooKeeper "multi" feature for atomic update of multiple znodes were considered. It wasn't clear that these would be better than the algorithms presented above.