...
AlterPartitionCountsRequest
andAlterPartitionCountsResponse
AlterReplicationFactorsRequest
andAlterReplicationFactorsResponse
ReassignPartitionsRequest
andReassignPartitionsResponse
AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse
The AdminClient
API will have new methods added (plus overloads for options):
alterPartitionCounts(Map<String, Integer> partitionCounts)
alterReplicationFactors(Map<String, Short> replicationFactors)
reassignPartitions(Map<TopicPartition, List<Integer>>)
The options accepted by kafka-reassign-partitions.sh
command will change:
...
Partition reassignment is a long running operation, and the ReassignPartitionsResult
indicates only that the reassignment has been started, not that the reassignment has been completed. The describeReplicaDir()
method from KIP-113 can be used to determine progress.
AdminClient: alterInterBrokerThrottle()
Anchor | ||||
---|---|---|---|---|
|
AdminClient
to support changing the inter-broker transfer throttle....
Code Block |
---|
public class AlterInterBrokerThrottleOptions { public Integer timeoutMs() { ... } public AlterInterBrokerThrottleOptions timeoutMs(Integer timeoutMs) { ... } } public class AlterInterBrokerThrottleResult { public KafkaFuture<Void> all(); } |
Notes:
- The throttled rate is implemented as a DynamicConfig on the broker, but this is an implementation detail so it wouldn't be appropriate to set it via
alterConfigs()
. In any case it's currently not possible to change broker configs viaalterConfigs()
. The
throttle is broker-specific and different brokers can have different rates applied.
...
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existUNKNOWN_MEMBER_ID
(25) If any broker ids in the partition_assignment included an unknown broker idINVALID_REQUEST
(42) If duplicate topics appeared in the requestPARTITION_REASSIGNMENT_IN_PROGRESS
(new)INVALID_REPLICA_ASSIGNMENT
(39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.NONE
(0) reassignment has started
Summary of user cases
Network Protocol: AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse
Anchor | ||||
---|---|---|---|---|
|
AlterInterBrokerThrottleRequest
changes the leader and follower throttle rates for inter-broker tranfers from/to the broker receiving the request.No Format |
---|
AlterInterBrokerThrottleRequest => leader_rate follower_rate timeout
leader_rate => INT64
follower_rate => INT64
timeout => INT32 |
Where:
Field | Description |
---|---|
leader_rate | The approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the leader |
follower_rate | The approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the follower |
timeout | The maximum time to await a response in ms. |
The request requires the ClusterAction
operation on the CLUSTER
resource, since it can affect significant inter-broker communication.
Anchor | ||||
---|---|---|---|---|
|
AlterInterBrokerThrottleResponse
just acknowledges the success or otherwise of a AlterInterBrokerThrottleRequest
No Format |
---|
AlterInterBrokerThrottleResponse => error_code error_message throttle_time_ms
error_code => INT16
error_message => NULLABLE_STRING
throttle_time_ms => INT32 |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
error_code | an error code for the request |
error_message | more detailed information about any error |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_REQUEST
(42) Leader rate or follower rate was out of range- NONE (0) the request was successful
Summary of use cases
Use case | command | AdminClient |
---|---|---|
Increase partition count | kafka- | |
Use case | command | AdminClient |
Increase partition count | kafka-topics --alter --topic T --partitions P | alterPartitionCount() |
Change replication factor | kafka-topics --alter --topic T --replication-factor F | alterReplicationFactors() |
Change partition assignment | kafka-topics --alter --topic T --replica-assignment A | reassignPartitions() |
Change partition assignment | kafka-reassign-partitions --execute --reassignment-json-file J | reassignPartitions() |
Change partition assignment with throttle | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
|
Change throttled rate | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
|
Check progress of a reassignment | kafka-reassign-partitions --progress --reassignment-json-file J | describeReplicaDir() (see KIP-113) |
Check result and clear throttle | kafka-reassign-partitions --verify --reassignment-json-file J |
|
...
- Some calls to
alterTopics()
(such as increasing the partition count) would have been synchronous, while others (such as moving replicas between brokers) would have been long running and thus asynchronous. This made for an API which synchronousness depended on the arguments. createTopics()
allows to specify topic configs, whereasalterConfigs()
is already provided to change topic configs, so it wasn't an exact mirror
Just providing providing reassignPartitions()
was considered, with changes to partition count and replication factor inferred from partitions and brokers present in the assignments
argument. This would require the caller to provide an assignment of partitions to brokers, but the AdminClient doesn't provide methods to obtain the necessary information to make an informed decision (what it the load on the brokers, how much free disk space do they have, or how many partitions are they already hosting), so it was felt better to retain alterPartitionCounts()
and alterReplicationFactors()
for the time being.
...