...
New network protocol APIs will be added:
AlterPartitionCountsRequest
IncreasePartitionCountsRequest
andAlterPartitionCountsResponse
IncreasePartitionCountsResponseReassignPartitionsRequest
andReassignPartitionsResponse
AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse
The AdminClient
API will have new methods added (plus overloads for options):
alterPartitionCountsincreasePartitionCounts(Map<String, Integer> PartitionCount> partitionCounts)
reassignPartitions(Map<TopicPartition, List<Integer>>)
alterInterBrokerThrottle(long, long)
Support for changing DynamicConfig
broker configs via AdminClient.alterConfigs()
will be added
The options accepted by kafka-reassign-partitions.sh
command will change:
...
Use case | command | AdminClient |
---|---|---|
Increase partition count | kafka-topics --alter --topic T --partitions P | alterPartitionCountincreasePartitionCount() |
Change replication factor | kafka-reassign-partitions --execute --reassignment-json-file J |
|
Change partition assignment | kafka-reassign-partitions --execute --reassignment-json-file J | reassignPartitions() |
Change partition assignment with throttle | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
|
Change throttled rate | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
|
Check progress of a reassignment | kafka-reassign-partitions --progress --reassignment-json-file J | (see KIP-113) |
Check result and clear throttle | kafka-reassign-partitions --verify --reassignment-json-file J |
|
...
In all other respects, the public API of ReassignPartitionsCommand
will not be changed.
AdminClient:
...
increasePartitionCount()
Anchor | ||||
---|---|---|---|---|
|
kafka-topics.sh --alter --partitions ...
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * <p>Change<p>Increase the partition count of the topics given as the keys of {@code counts} * according to the corresponding values. Currently it is only possible to increase * the partition count.</p> */ public AlterPartitionCountsResultIncreasePartitionCountsResult alterPartitionCountsincreasePartitionCounts(Map<String, PartitionCount> counts, AlterPartitionCountsOptionsIncreasePartitionCountsOptions options) public AlterPartitionCountsResultIncreasePartitionCountsResult alterPartitionCountsincreasePartitionCounts(Map<String, PartitionCount> counts) |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** Describes a change in a topic's partition count. */ public class PartitionCount { private int partitionCount; private List<List<Integer>> assignments; private PartitionCount(int partitionCount) { ... } /** * Increase the partition count for a topic to the given {@code newCount}. * The assignment of new replicas to brokers will be decided by the broker * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p> */ public static PartitionCount increasePartitionCount(int newCount) { ... } /** * <p>Increase the partition count for a topic to the given {@code newCount} * assigning the new partitions according to the given {@code newAssignments}. * The length of {@code newAssignments} should equal {@code newCount - oldCount}, since * the assignment of existing partitions are not changed. * Each inner list of {@code newAssignments} should have a length equal to * the topic's replication factor. * The first broker id in each inner list is the "preferred replica".</p> * * <p>For example, suppose a topic currently has a replication factor of 2, and * has 3 partitions. The number of partitions can be increased to 4 * (with broker 1 being the preferred replica for the new partition) using a * {@code PartitionCount} constructed like this:</p> * * <pre><code>PartitionCount.increasePartitionCount(4, Arrays.asList(Arrays.asList(1, 2))</code></pre> * */ public static PartitionCount increasePartitionCount(int newCount, List<List<Integer>> newAssignments) { ... } } public class AlterPartitionCountsOptionsIncreasePartitionCountsOptions { public AlterPartitionCountsOptionsIncreasePartitionCountsOptions() { ... } public Integer timeoutMs() { ... } public AlterPartitionCountsOptionsIncreasePartitionCountsOptions timeoutMs(Integer timeoutMs) { ... } public boolean validateOnly() { ... } /** * Validate the request only: Do not actually change any partition counts. */ public AlterPartitionCountsOptionsIncreasePartitionCountsOptions validateOnly() { ... } } public class AlterPartitionCountsResultIncreasePartitionCountsResult { // package access constructor Map<String, KafkaFuture<Void>> values() { ... } KafkaFuture<Void> all() { ... } } |
Network Protocol:
...
IncreasePartitionCountsRequest and IncreasePartitionCountsResponse
Anchor | ||||
---|---|---|---|---|
|
AlterPartitionCountsRequest
The IncreasePartitionCountsRequest
is used to change increase the partition count for a batch of topics, and is the basis for the AdminClient.alterPartitionCountsincreasePartitionCounts()
method.The request can be sent to any broker.
...
After validating the request the broker calls AdminUtils.addPartitions()
which ultimately updates the topic partition assignment znode (/brokers/topics/${topic}
).
No Format |
---|
AlterPartitionCountsRequestIncreasePartitionCountsRequest => [topic_partition_count] timeout topic_partition_count => topic partition_count topic => STRING partition_count => count [assignment] count => INT32 assignment => [INT32] timeout => INT32 |
...
Field | Description |
---|---|
topic | the name of a topic |
count | the new partition count |
assignment | a list of assigned brokers (one list for each new partition) |
timeout | The maximum time to await a response in ms. |
Anchor | ||||
---|---|---|---|---|
|
No Format |
---|
AlterPartitionCountsResponseIncreasePartitionCountsResponse => throttle_time_ms [topic_partition_count_error] topic_partition_count_error => topic error_code error_message topic => STRING error_code => INT16 error_message => NULLABLE_STRING |
...
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failed- POLICY_VIOLATION(44) The request violated the configured policy
INVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existUNKNOWN_MEMBER_ID
(25) If any broker ids in the partition_reassignments included an unknown broker idINVALID_REQUEST
(42) If duplicate topics appeared in the requestPARTITION_REASSIGNMENT_IN_PROGRESS
(new) If the reassignment cannot be started because a reassignment is currently running (i.e. the/admin/reassign_partitions
znode exists)INVALID_REPLICA_ASSIGNMENT
(39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.NONE
(0) reassignment has started
AdminClient:
...
alterConfigs()
Anchor | ||||
---|---|---|---|---|
|
- A previous call to
reassignPartitions()
has been made and the user wants to apply or change the throttle. - A previous call to
reassignPartitions()
is complete and the user wants to remove the throttle.
Notes:
- The throttle can be queried via describeConfigs()
- The throttle cannot be set via alterConfigs() because: 1) alterConfigs doesn't support changing broker configs and 2) throttles are a special-case
DynamicConfig
. The
throttle is broker-specific and different brokers can have different rates applied.
The following methods will be added to AdminClient
to support changing the inter-broker transfer throttle.
Code Block |
---|
/**
* Alter the throttle of inter-broker replication for the given broker to the given rate.
* @param broker The broker
* @param leaderRateBytesPerSecond The approximate maximum transfer rate, in bytes per second,
* to be used for inter-broker replication when the broker is the leader.
* @param followerRateBytesPerSecond The approximate maximum transfer rate, in bytes per second,
* to be used for inter-broker replication when the broker is the follower.
*/
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond)
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond, AlterInterBrokerThrottleOptions options) |
Where:
Code Block |
---|
public class AlterInterBrokerThrottleOptions {
public Integer timeoutMs() { ... }
public AlterInterBrokerThrottleOptions timeoutMs(Integer timeoutMs) { ... }
}
public class AlterInterBrokerThrottleResult {
public KafkaFuture<Void> all();
} |
Network Protocol: AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse
...
No Format |
---|
AlterInterBrokerThrottleRequest => leader_rate follower_rate timeout
leader_rate => INT64
follower_rate => INT64
timeout => INT32 |
Where:
Field | Description |
---|---|
leader_rate | The approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the leader |
follower_rate | The approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the follower |
timeout | The maximum time to await a response in ms. |
The request requires the Alter
operation on the CLUSTER
resource, since it can affect significant inter-broker communication.
...
No Format |
---|
AlterInterBrokerThrottleResponse => error_code error_message throttle_time_ms
error_code => INT16
error_message => NULLABLE_STRING
throttle_time_ms => INT32 |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
error_code | an error code for the request |
error_message | more detailed information about any error |
Anticipated errors:
...
already exists, but doesn't currently support changing broker configs. Inter-broker throttling is implemented via DynamicConfig
s on the broker:
leader.replication.throttled.rate
follower.replication.throttled.rate
DynamicConfig
s are configs "which have no physical manifestation in the server.properties and can only be set dynamically".
And via topic configs:
leader.replication.throttled.replicas
follower.replication.throttled.replicas
The alterConfigs()
API will be changed to allow altering DynamicConfig
s in order to support the following use cases:
- A previous call to
reassignPartitions()
has been made and the user wants to apply or change the throttle. - A previous call to
reassignPartitions()
is complete and the user wants to remove the throttle.
This KIP does not propose to support altering broker configs that are not DynamicConfig
s.
This will involve changing the API doc.
Network Protocol: AlterConfigsRequest and AlterConfigsResponse
This protocol already exists, but support will be added for altering broker DynamicConfigs.
...
Compatibility, Deprecation, and Migration Plan
...