...
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Summary of use cases
Use case | command | AdminClient |
---|---|---|
Increase partition count | kafka |
...
-topics --alter --topic T --partitions P | alterPartitionCount() | |
Change replication factor | kafka-reassign-partitions --execute --reassignment-json-file J |
|
Change partition assignment | kafka-reassign-partitions --execute --reassignment-json-file J | reassignPartitions() |
Change partition assignment with throttle | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
|
Change throttled rate | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
|
Check progress of a reassignment | kafka-reassign-partitions --progress --reassignment-json-file J | describeReplicaDir() (see KIP-113) |
Check result and clear throttle | kafka-reassign-partitions --verify --reassignment-json-file J |
|
kafka-reassign-partitions.sh
and ReassignPartitionsCommand
The --zookeeper
option will be retained and will:
- Cause a deprecation warning to be printed to standard error. The message will say that the
--zookeeper
option will be removed in a future version and that--bootstrap-server
is the replacement option. - Perform the reassignment via ZooKeeper, as currently.
A new --bootstrap-server
option will be added and will:
- Perform the reassignment via the given intermediating broker.
Using both --zookeeper
and --bootstrap-server
in the same command will produce an error message and the tool will exit without doing the intended operation.
It is anticipated that a future version of Kafka would remove support for the --zookeeper
option.
A new --progress
action option will be added. This will only be supported when used with --bootstrap-server
. If used with --zookeeper
the command will produce an error message and the tool will exit without doing the intended operation. --progress
will report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file
option
For example:
No Format |
---|
# If the following command is used to start a reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
--reassignment-json-file expand-cluster-reassignment.json \
--execute
# then the following command will print the progress of
# that reassignment, then exit immediately
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
--reassignment-json-file expand-cluster-reassignment.json \
--progress |
That might print something like the following:
No Format |
---|
Topic Partition Broker Status
-------------------------------------
my_topic 0 |
The --zookeeper
option will be retained and will:
- Cause a deprecation warning to be printed to standard error. The message will say that the
--zookeeper
option will be removed in a future version and that--bootstrap-server
is the replacement option. - Perform the reassignment via ZooKeeper, as currently.
A new --bootstrap-server
option will be added and will:
- Perform the reassignment via the given intermediating broker.
Using both --zookeeper
and --bootstrap-server
in the same command will produce an error message and the tool will exit without doing the intended operation.
It is anticipated that a future version of Kafka would remove support for the --zookeeper
option.
A new --progress
action option will be added. This will only be supported when used with --bootstrap-server
. If used with --zookeeper
the command will produce an error message and the tool will exit without doing the intended operation. --progress
will report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file
option
For example:
No Format |
---|
# If the following command is used to start a reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
--reassignment-json-file expand-cluster-reassignment.json \
--execute
# then the following command will print the progress of
# that reassignment, then exit immediately
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
--reassignment-json-file expand-cluster-reassignment.json \
--progress |
That might print something like the following:
No Format |
---|
Topic Partition Broker Status ------------------------------------- my_topic 0 0 In sync my_topic 0 1 Behind: 10456 messages behind asdf 0 1 Unknown topic my_topic 42 10 UnknownIn partitionsync my_topic 0 421 Unknown broker my_topicBehind: 10456 messages behind asdf 1 0 0 1 Broker does not host this partition |
The implementation of --progress
will make use of the describeReplicaDir()
method from KIP-113 to find the lag of the syncing replica.
Internally, the ReassignPartitionsCommand
will be refactored to support the above changes to the options. An interface will abstract the commands currently issued directly to zookeeper.
There will be an implementation which makes the current calls to ZooKeeper, and another implementation which uses the AdminClient
API described below.
In all other respects, the public API of ReassignPartitionsCommand
will not be changed.
AdminClient: alterPartitionCount()
...
Unknown topic
my_topic 42 1 Unknown partition
my_topic 0 42 Unknown broker
my_topic 1 0 Broker does not host this partition |
The implementation of --progress
will make use of the describeReplicaDir()
method from KIP-113 to find the lag of the syncing replica.
Internally, the ReassignPartitionsCommand
will be refactored to support the above changes to the options. An interface will abstract the commands currently issued directly to zookeeper.
There will be an implementation which makes the current calls to ZooKeeper, and another implementation which uses the AdminClient
API described below.
In all other respects, the public API of ReassignPartitionsCommand
will not be changed.
AdminClient: alterPartitionCount()
Anchor | ||||
---|---|---|---|---|
|
kafka-topics.sh --alter --partitions ...
Notes:
- This API is synchronous in the sense that the client can assume that the partition count has been changed (or the request was rejected) once they have obtained the result for the topic from the
AlterPartitionCountsResult
.
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* <p>Change the partition count of the topics given as the keys of {@code counts}
* according to the corresponding values. Currently it is only possible to increase
* the partition count.</p>
*/
public AlterPartitionCountsResult alterPartitionCounts(Map<String, PartitionCount> counts,
AlterPartitionCountsOptions options)
public AlterPartitionCountsResult alterPartitionCounts(Map<String, PartitionCount> counts) |
Where:
Code Block | ||||
---|---|---|---|---|
| ||||
/** Describes a change in a topic's partition count. */
public class PartitionCount {
private int partitionCount;
private List<List<Integer>> assignments;
private PartitionCount(int partitionCount |
Notes:
- This API is synchronous in the sense that the client can assume that the partition count has been changed (or the request was rejected) once they have obtained the result for the topic from the
AlterPartitionCountsResult
.
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* <p>Change the partition count of the topics given as the keys of {@code counts}
* according to the corresponding values. Currently it is only possible to increase
* the partition count.</p>
*/
public AlterPartitionCountsResult alterPartitionCounts(Map<String, PartitionCount> counts,
AlterPartitionCountsOptions options)
public AlterPartitionCountsResult alterPartitionCounts(Map<String, PartitionCount> counts) |
Where:
Code Block | ||||
---|---|---|---|---|
| ||||
/** Describes a change in a topic's partition count. */ public class PartitionCount { private int partitionCount; private List<List<Integer>> assignments; private PartitionCount(int partitionCount) { ... } /** * Increase the partition count for a topic to the given {@code newCount}. * The assignment of new replicas to brokers will be decided by the broker * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p> */ public static PartitionCount increasePartitionCount(int newCount) { ... } /** * <p>Increase the partition count for a topic to the given {@code newCount} * assigning the new partitions according to the given {@code newAssignments}. * The length of {@code newAssignments} should equal {@code newCount - oldCount}, since * the assignment of existing partitions are not changed. * Each inner list of {@code newAssignments} should have a length equal to * the topic's replication factor. * The first broker id in each inner list is the "preferred replica".</p> * * <p>For example, suppose a topic currently has a replication factor of 2, and * has 3 partitions. The number of partitions can be increased to 4 * (with broker 1 being the preferred replica for the new partition) using a * {@code PartitionCount} constructed like this:</p> * * <pre><code>PartitionCount.increasePartitionCount(4, Arrays.asList(Arrays.asList(1, 2))</code></pre> * */ public static PartitionCount increasePartitionCount(int newCount, List<List<Integer>> newAssignments) { ... } } public class AlterPartitionCountsOptions { public AlterPartitionCountsOptions() { ... } public Integer timeoutMs() { ... } public AlterPartitionCountsOptions timeoutMs(Integer timeoutMs) { ... } public boolean validateOnly() { ... } /** * ValidateIncrease the request only: Do not actually change any partition counts.partition count for a topic to the given {@code newCount}. */ The assignment of publicnew AlterPartitionCountsOptionsreplicas validateOnly()to {brokers ... } } public class AlterPartitionCountsResult { // package access constructor Map<String, KafkaFuture<Void>> values() { ... } KafkaFuture<Void> all() { ... } } |
AdminClient: reassignPartitions()
...
- Changing the partition assignment, via
kafka-reassign-partitions.sh
- Changing the replication factor, via
kafka-reassign-partitions.sh
Notes:
- This API is asynchronous in the sense that the client cannot assume that the request is complete (or the request was rejected) once they have obtained the result for the topic from the
ReassignPartitionsResult
. - The
describeReplicaDir()
method from KIP-113 can be used to determine progress. - When the request is complete (HOW DETERMINED?) the throttle should be removed by a call or calls to
alterInterBrokerThrottle()
(see below)
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* <p>Assign the partitions given as the key of the given <code>assignments</code> to the corresponding
* list of brokers. This can be used to change the replica assignment or change the topic's replication factor.
* The first broker in each list is the one which holds the "preferred replica".</p>
*
* <p>To change the replication factor for a topic there must be a key for each partition in
* the {@code assignment}s map and the corresponding list of brokers must each be of the same
* length which will become the new replication factor.</p>
*
* <p>If only a subset of the partitions of a particular topic are present in {@code assignments}
* the change is taken to be a reassignment of replicas to brokers and each list must have
* the same length as the current topic replication factor.</p>
*
* <h3>Throttling</h3>
* <p>Inter-broker reassignment and/or increasing the replication factor causes significant
* inter-broker traffic and can take a long time
* in order to copy the replica data to brokers. It may be necessary to impose a throttle on
* inter-broker traffic for the duration of the reassignment so that client-broker traffic is not
* adversely affected.</p>
*
* <h3>Preferred replica</h3>
* <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker
* with the preferred replica will be elected leader automatically.
* <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this
* election when <code>auto.leader.rebalance.enable=false</code>.</p>
*/
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments)
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments,
ReassignPartitionsOptions options)
|
Where:
Code Block | ||||
---|---|---|---|---|
| ||||
public class ReassignPartitionsOptions {
public boolean validateOnly()
/**
* Validate the request only: Do not actually trigger replica reassignment.
*/
public ReassignPartitionsOptions validateOnly(boolean validateOnly)
public long timeoutMs()
/**
* Set a timeout for the starting of the reassignment.
* Note this timeout does not include the time take to actually
* move replicas between brokers.
*/
public ReassignPartitionsOptions timeoutMs(long timeoutMs)
public long throttle()
/**
* Set a throttle, in bytes per second, on the bandwidth used for
* inter-broker replica movement for all movements implied by the
* partition reassignments.
* Traffic between each broker (pairwise) will be throttled to approximately the given limit
* The throttle rate should be at least 1 KB/s.
* By default no throttle is applied.
*/
public ReassignPartitionsOptions throttle(long throttledRateBytesPerSecond)
}
public class ReassignPartitionsResult {
public Map<TopicPartition, KafkaFuture<Void>> values();
public KafkaFuture<Void> all();
} |
AdminClient: alterInterBrokerThrottle()
...
- A previous call to
reassignPartitions()
has been made and the user wants to apply or change the throttle. - A previous call to
reassignPartitions()
is complete and the user wants to remove the throttle.
Notes:
- The throttle can be queried via describeConfigs()
- The throttle cannot be set via alterConfigs() because: 1) alterConfigs doesn't support changing broker configs and 2) throttles are a special-case
DynamicConfig
. The
throttle is broker-specific and different brokers can have different rates applied.
The following methods will be added to AdminClient
to support changing the inter-broker transfer throttle.
Code Block |
---|
/**
* Alter the throttle of inter-broker replication for the given broker to the given rate.
* @param broker The broker
* @param leaderRateBytesPerSecond The approximate maximum transfer rate, in bytes per second,
* to be used for inter-broker replication when the broker is the leader.
* @param followerRateBytesPerSecond The approximate maximum transfer rate, in bytes per second,
* to be used for inter-broker replication when the broker is the follower.
*/
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond)
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond, AlterInterBrokerThrottleOptions options) |
Where:
Code Block |
---|
public class AlterInterBrokerThrottleOptions {
public Integer timeoutMs() { ... }
public AlterInterBrokerThrottleOptions timeoutMs(Integer timeoutMs) { ... }
}
public class AlterInterBrokerThrottleResult {
public KafkaFuture<Void> all();
} |
Notes:
- The throttled rate is implemented as a DynamicConfig on the broker, but this is an implementation detail so it wouldn't be appropriate to set it via
alterConfigs()
. In any case it's currently not possible to change broker configs viaalterConfigs()
. The
throttle is broker-specific and different brokers can have different rates applied.
Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse
...
No Format |
---|
AlterPartitionCountsRequest => [topic_partition_count] timeout
topic_partition_count => topic partition_count
topic => STRING
partition_count => INT32
timeout => INT32 |
Where
Field | Description |
---|---|
topic | the name of a topic |
partition_count | the new partition count |
timeout | The maximum time to await a response in ms. |
The request will require the ALTER
operation on the Topic
resource.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
...
will be decided by the broker
* but may subsequently be moved using {@link #reassignPartitions(Map)}.</p>
*/
public static PartitionCount increasePartitionCount(int newCount) { ... }
/**
* <p>Increase the partition count for a topic to the given {@code newCount}
* assigning the new partitions according to the given {@code newAssignments}.
* The length of {@code newAssignments} should equal {@code newCount - oldCount}, since
* the assignment of existing partitions are not changed.
* Each inner list of {@code newAssignments} should have a length equal to
* the topic's replication factor.
* The first broker id in each inner list is the "preferred replica".</p>
*
* <p>For example, suppose a topic currently has a replication factor of 2, and
* has 3 partitions. The number of partitions can be increased to 4
* (with broker 1 being the preferred replica for the new partition) using a
* {@code PartitionCount} constructed like this:</p>
*
* <pre><code>PartitionCount.increasePartitionCount(4, Arrays.asList(Arrays.asList(1, 2))</code></pre>
*
*/
public static PartitionCount increasePartitionCount(int newCount, List<List<Integer>> newAssignments) { ... }
}
public class AlterPartitionCountsOptions {
public AlterPartitionCountsOptions() { ... }
public Integer timeoutMs() { ... }
public AlterPartitionCountsOptions timeoutMs(Integer timeoutMs) { ... }
public boolean validateOnly() { ... }
/**
* Validate the request only: Do not actually change any partition counts.
*/
public AlterPartitionCountsOptions validateOnly() { ... }
}
public class AlterPartitionCountsResult {
// package access constructor
Map<String, KafkaFuture<Void>> values() { ... }
KafkaFuture<Void> all() { ... }
} |
Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse
Anchor | ||||
---|---|---|---|---|
|
AlterPartitionCountsRequest
is used to change the partition count for a batch of topics, and is the basis for the AdminClient.alterPartitionCounts()
method.The request will require the ALTER
operation on the Topic
resource.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
No Format |
---|
AlterPartitionCountsRequest => [topic_partition_count] timeout
topic_partition_count => topic partition_count
topic => STRING
partition_count => count [assignment]
count => INT32
assignment => [INT32]
timeout => INT32 |
Where
Field | Description |
---|---|
topic | the name of a topic |
count | the new partition count |
assignment | a list of assigned brokers (one list for each new partition) |
timeout | The maximum time to await a response in ms. |
Anchor | ||||
---|---|---|---|---|
|
No Format |
---|
AlterPartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
topic_partition_count_error => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the name of a topic in the request |
error_code | an error code for that topic |
error_message | more detailed information about any error for that topic |
Anticipated errors:
TOPIC_AUTHORIZATION_FAILED
(29) The user lacked Alter on the topicPOLICY_VIOLATION(44) The request violated the configured policy
INVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_PARTITIONS
(37) If the partitioncount
was <= the current partition count for the topic.INVALID_REQUEST
(42) If duplicate topics appeared in the request, or the size of thepartitions
list did not equal the number of new partitions, or if the size of any of the lists contained in thepartitions
list was not equal to the topic replication factorNONE
(0) The topic partition count was changed successfully.
AdminClient: reassignPartitions()
Anchor | ||||
---|---|---|---|---|
|
- Changing the partition assignment, via
kafka-reassign-partitions.sh
- Changing the replication factor, via
kafka-reassign-partitions.sh
Notes:
- This API is asynchronous in the sense that the client cannot assume that the request is complete (or the request was rejected) once they have obtained the result for the topic from the
ReassignPartitionsResult
. - The
describeReplicaDir()
method from KIP-113 can be used to determine progress. A call to
reassignPartitions()
with thevalidateOnly
option can be used to determine whether a reassignment is currently running, and therefore whether the last reassignment has finished.- When the request is complete the throttle should be removed by a call or calls to
alterInterBrokerThrottle()
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* <p>Assign the partitions given as the key of the given <code>assignments</code> to the corresponding
* list of brokers. This can be used to change the replica assignment or change the topic's replication factor.
* The first broker in each list is the one which holds the "preferred replica".</p>
*
* <p>To change the replication factor for a topic there must be a key for each partition in
* the {@code assignment}s map and the corresponding list of brokers must each be of the same
* length which will become the new replication factor.</p>
*
* <p>If only a subset of the partitions of a particular topic are present in {@code assignments}
* the change is taken to be a reassignment of replicas to brokers and each list must have
* the same length as the current topic replication factor.</p>
*
* <h3>Throttling</h3>
* <p>Inter-broker reassignment and/or increasing the replication factor causes significant
* inter-broker traffic and can take a long time
* in order to copy the replica data to brokers. It may be necessary to impose a throttle on
* inter-broker traffic for the duration of the reassignment so that client-broker traffic is not
* adversely affected.</p>
*
* <h3>Preferred replica</h3>
* <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker
* with the preferred replica will be elected leader automatically.
* <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this
* election when <code>auto.leader.rebalance.enable=false</code>.</p>
*/
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments)
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments,
ReassignPartitionsOptions options)
|
Where:
Code Block | ||||
---|---|---|---|---|
| ||||
public class ReassignPartitionsOptions {
public boolean validateOnly()
/**
* Validate the request only: Do not actually trigger replica reassignment.
*/
public ReassignPartitionsOptions validateOnly(boolean validateOnly)
public long timeoutMs()
/**
* Set a timeout for the starting of the reassignment.
* Note this timeout does not include the time take to actually
* move replicas between brokers.
*/
public ReassignPartitionsOptions timeoutMs(long timeoutMs)
public long throttle()
/**
* Set a throttle, in bytes per second, on the bandwidth used for
* inter-broker replica movement for all movements implied by the
* partition reassignments.
* Traffic between each broker (pairwise) will be throttled to approximately the given limit
* The throttle rate should be at least 1 KB/s.
* By default no throttle is applied.
*/
public ReassignPartitionsOptions throttle(long throttledRateBytesPerSecond)
}
public class ReassignPartitionsResult {
public Map<TopicPartition, KafkaFuture<Void>> values();
public KafkaFuture<Void> all();
} |
Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse
Anchor | ||||
---|---|---|---|---|
|
ReassignPartitionsRequest
initiates the movement of replicas between brokers, and is the basis of the AdminClient.reassignPartitions()
methodNotes:
No Format |
---|
AlterPartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
topic_partition_count_error => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the name of a topic in the request |
error_code | an error code for that topic |
error_message | more detailed information about any error for that topic |
Anticipated errors:
TOPIC_AUTHORIZATION_FAILED
(29) The user lacked Alter on the topicINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_PARTITIONS
(37) If the num_partitions was invalidINVALID_REQUEST
(42) If duplicate topics appeared in the request.NONE
(0) The topic partition count was changed successfully.
Network Protocol: AlterReplicationFactorsRequest and AlterReplicationFactorsResponse
...
No Format |
---|
AlterReplicationFactorsRequest => [topic_replication_factor] timeout
topic_replication_factor => topic replication_factor
topic => STRING
replication_factor => INT16
timeout => INT32
// TODO: validate_only? |
Where
Field | Description |
---|---|
topic | topic name |
replication_factor | the new replication factor for this topic |
timeout | The maximum time to await a response in ms. |
...
- The request requires the
ClusterAction
operation on the
CLUSTER
resource, since it can require significant inter-broker communication.- The request is subject to the
CreateTopicPolicy
of the broker as configured by the broker'screate.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
...
...
No Format |
---|
AlterReplicationFactorResponseReassignPartitionsRequest => throttle_time_ms [topic_replication_factor_error] topic_replication_factor_error => topic error_code error_message[topic_reassignments] timeout validate_only topic_reassignments => topic [partition_reassignments] topic => STRING partition_reassignments => partition_id [broker] topic partition_id => STRINGINT32 error_code broker => INT16INT32 timeout => INT32 error validate_messageonly => NULLABLE_STRINGBOOLEAN |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the name of a topic in the request |
error_code | an error code for that topic |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_REPLICATION_FACTOR
(38) If the replication_factor was invalidINVALID_REQUEST
(42) If duplicate topics appeared in the request.NONE
(0) The topic replication factor was changed successfully.
Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse
...
of a topic | |
partition_id | a partition of that topic |
broker | a broker id |
timeout | The maximum time to await a response in ms. |
validate_only | when true: validate the request, but don't actually reassign the partitions |
Anchor | ||||
---|---|---|---|---|
|
ReassignPartitionsResponse
describes which partitions in the request will be moved, and what was wrong with the request for those partitions which will not be moved.No Format |
---|
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
throttle_time_ms => INT32
reassign_partition_result |
No Format |
ReassignPartitionsRequest => [topic_reassignments] timeout validate_only topic_reassignments => topic [partition_reassignmentserror] topic => STRING partition_reassignmentserror => partition_id [broker]error_code error_message partition_id => INT32 brokererror_code => INT32INT16 timeout => INT32 validateerror_onlymessage => BOOLEANNULLABLE_STRING |
Where
Field | Description |
---|---|
topic | the name of a topic |
partition_id | a partition of that topic |
broker | a broker id |
timeout | The maximum time to await a response in ms. |
validate_only | when true: validate the request, but don't actually reassign the partitions |
The request requires the ClusterAction
operation on the CLUSTER
resource, since it can require significant inter-broker communication.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
...
No Format |
---|
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
throttle_time_ms => INT32
reassign_partition_result => topic [partition_error]
topic => STRING
partition_error => partition_id error_code error_message
partition_id => INT32
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | a topic name from the request |
partition_id | a partition id for that topic, from the request |
error_code | an error code for that topic partition |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existUNKNOWN_MEMBER_ID
(25) If any broker ids in the partition_assignment included an unknown broker idINVALID_REQUEST
(42) If duplicate topics appeared in the requestPARTITION_REASSIGNMENT_IN_PROGRESS
(new)INVALID_REPLICA_ASSIGNMENT
(39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.NONE
(0) reassignment has started
Network Protocol: AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse
...
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | a topic name from the request |
partition_id | a partition id for that topic, from the request |
error_code | an error code for that topic partition |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failed- POLICY_VIOLATION(44) The request violated the configured policy
INVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existUNKNOWN_MEMBER_ID
(25) If any broker ids in the partition_reassignments included an unknown broker idINVALID_REQUEST
(42) If duplicate topics appeared in the requestPARTITION_REASSIGNMENT_IN_PROGRESS
(new) If the reassignment cannot be started because a reassignment is currently running.INVALID_REPLICA_ASSIGNMENT
(39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.NONE
(0) reassignment has started
AdminClient: alterInterBrokerThrottle()
Anchor | ||||
---|---|---|---|---|
|
- A previous call to
reassignPartitions()
has been made and the user wants to apply or change the throttle. - A previous call to
reassignPartitions()
is complete and the user wants to remove the throttle.
Notes:
- The throttle can be queried via describeConfigs()
- The throttle cannot be set via alterConfigs() because: 1) alterConfigs doesn't support changing broker configs and 2) throttles are a special-case
DynamicConfig
. The
throttle is broker-specific and different brokers can have different rates applied.
The following methods will be added to AdminClient
to support changing the inter-broker transfer throttle.
Code Block |
---|
/**
* Alter the throttle of inter-broker replication for the given broker to the given rate.
* @param broker The broker
* @param leaderRateBytesPerSecond The approximate maximum transfer rate, in bytes per second,
* to be used for inter-broker replication when the broker is the leader.
* @param followerRateBytesPerSecond The approximate maximum transfer rate, in bytes per second,
* to be used for inter-broker replication when the broker is the follower.
*/
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond)
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond, AlterInterBrokerThrottleOptions options) |
Where:
Code Block |
---|
public class AlterInterBrokerThrottleOptions {
public Integer timeoutMs() { ... }
public AlterInterBrokerThrottleOptions timeoutMs(Integer timeoutMs) { ... }
}
public class AlterInterBrokerThrottleResult {
public KafkaFuture<Void> all();
} |
Network Protocol: AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse
Anchor | ||||
---|---|---|---|---|
|
AlterInterBrokerThrottleRequest
changes the leader and follower throttle rates for inter-broker tranfers from/to the broker receiving the request.No Format |
---|
AlterInterBrokerThrottleRequest => leader_rate follower_rate timeout
leader_rate => INT64
follower_rate => INT64
timeout => INT32 |
Where:
Field | Description |
---|---|
leader_rate | The approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the leader |
follower_rate | The approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the follower |
timeout | The maximum time to await a response in ms. |
The request requires the ClusterAction
operation on the CLUSTER
resource, since it can affect significant inter-broker communication.
Anchor | ||||
---|---|---|---|---|
|
AlterInterBrokerThrottleResponse
just acknowledges the success or otherwise of a AlterInterBrokerThrottleRequest
No Format |
---|
AlterInterBrokerThrottleResponse => error_code error_message throttle_time_ms
error_code => INT16
error_message => NULLABLE_STRING
throttle_time_ms => INT32 |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
error_code | an error code for the request |
error_message | more detailed information about any error |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_REQUEST
(42) Leader rate or follower rate was out of range- NONE (0) the request was successful
No Format |
---|
AlterInterBrokerThrottleRequest => leader_rate follower_rate timeout
leader_rate => INT64
follower_rate => INT64
timeout => INT32 |
Where:
Field | Description |
---|---|
leader_rate | The approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the leader |
follower_rate | The approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the follower |
timeout | The maximum time to await a response in ms. |
The request requires the ClusterAction
operation on the CLUSTER
resource, since it can affect significant inter-broker communication.
...
No Format |
---|
AlterInterBrokerThrottleResponse => error_code error_message throttle_time_ms
error_code => INT16
error_message => NULLABLE_STRING
throttle_time_ms => INT32 |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
error_code | an error code for the request |
error_message | more detailed information about any error |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_REQUEST
(42) Leader rate or follower rate was out of range- NONE (0) the request was successful
Summary of use cases
Use case | command | AdminClient |
---|---|---|
Increase partition count | kafka-topics --alter --topic T --partitions P | alterPartitionCount() |
Change replication factor | kafka-topics --alter --topic T --replication-factor F | alterReplicationFactors() |
Change partition assignment | kafka-topics --alter --topic T --replica-assignment A | reassignPartitions() |
Change partition assignment | kafka-reassign-partitions --execute --reassignment-json-file J | reassignPartitions() |
Change partition assignment with throttle | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
|
Change throttled rate | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
|
Check progress of a reassignment | kafka-reassign-partitions --progress --reassignment-json-file J | describeReplicaDir() (see KIP-113) |
Check result and clear throttle | kafka-reassign-partitions --verify --reassignment-json-file J |
|
...
Compatibility, Deprecation, and Migration Plan
...