Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The AdminClient API will have new methods added (plus overloads for options):

The options accepted by kafka-reassign-partitions.sh command will change:

...

Partition reassignment is a long running operation, and the ReassignPartitionsResult indicates only that the reassignment has been started, not that the reassignment has been completed. The describeReplicaDir() method from KIP-113 can be used to determine progress.

AdminClient: alterInterBrokerThrottle()

Anchor
alterInterBrokerThrottle()
alterInterBrokerThrottle()
The following methods will be added to AdminClient to support changing the inter-broker transfer throttle.

...

Code Block
public class AlterInterBrokerThrottleOptions {
    public Integer timeoutMs() { ... }
    public AlterInterBrokerThrottleOptions timeoutMs(Integer timeoutMs) { ... }
}

public class AlterInterBrokerThrottleResult {
    public KafkaFuture<Void> all();
}

Notes:

  • The throttled rate is implemented as a DynamicConfig on the broker, but this is an implementation detail so it wouldn't be appropriate to set it via alterConfigs(). In any case it's currently not possible to change broker configs via alterConfigs().
  • The throttle is broker-specific and different brokers can have different rates applied.

...

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_assignment included an unknown broker id
  • INVALID_REQUEST (42) If duplicate topics appeared in the request
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) reassignment has started

Summary of user cases

Network Protocol: AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse

Anchor
AlterInterBrokerThrottleRequest
AlterInterBrokerThrottleRequest
An AlterInterBrokerThrottleRequest changes the leader and follower throttle rates for inter-broker tranfers from/to the broker receiving the request.

No Format
AlterInterBrokerThrottleRequest => leader_rate follower_rate timeout
  leader_rate => INT64
  follower_rate => INT64
  timeout => INT32

Where:

FieldDescription
leader_rateThe approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the leader
follower_rateThe approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the follower
timeoutThe maximum time to await a response in ms.

The request  requires the ClusterAction operation on the CLUSTER resource, since it can affect significant inter-broker communication.

Anchor
AlterInterBrokerThrottleResponse
AlterInterBrokerThrottleResponse
An AlterInterBrokerThrottleResponse just acknowledges the success or otherwise of a AlterInterBrokerThrottleRequest

No Format
AlterInterBrokerThrottleResponse => error_code error_message throttle_time_ms
  error_code => INT16
  error_message => NULLABLE_STRING
  throttle_time_ms => INT32

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
error_codean error code for the request
error_messagemore detailed information about any error

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_REQUEST (42) Leader rate or follower rate was out of range
  • NONE (0) the request was successful

Summary of use cases

Use casecommandAdminClient
Increase partition countkafka-
Use casecommandAdminClient
Increase partition countkafka-topics --alter --topic T --partitions PalterPartitionCount()
Change replication factorkafka-topics --alter --topic T --replication-factor FalterReplicationFactors()
Change partition assignmentkafka-topics --alter --topic T --replica-assignment AreassignPartitions()
Change partition assignmentkafka-reassign-partitions --execute --reassignment-json-file JreassignPartitions()
Change partition assignment with throttlekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

reassignPartitions(validateOnly) // check none in progress

alterInterBrokerThrottle()

alterConfigs() // (leader|follower).replication.throttled.replicas

reassignPartitions()

Change throttled ratekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

alterInterBrokerThrottle()

Check progress of a reassignmentkafka-reassign-partitions --progress --reassignment-json-file JdescribeReplicaDir() (see KIP-113)
Check result and clear throttlekafka-reassign-partitions --verify --reassignment-json-file J

reassignPartitions(validateOnly) // check none in progress

alterConfigs() // (leader|follower).replication.throttled.replicas

...

  • Some calls to alterTopics() (such as increasing the partition count) would have been synchronous, while others (such as moving replicas between brokers) would have been long running and thus asynchronous. This made for an API which synchronousness depended on the arguments.
  • createTopics() allows to specify topic configs, whereas alterConfigs() is already provided to change topic configs, so it wasn't an exact mirror

Just providing providing reassignPartitions() was considered, with changes to partition count and replication factor inferred from partitions and brokers present in the assignments argument. This would require the caller to provide an assignment of partitions to brokers, but the AdminClient doesn't provide methods to obtain the necessary information to make an informed decision (what it the load on the brokers, how much free disk space do they have, or how many partitions are they already hosting), so it was felt better to retain alterPartitionCounts() and alterReplicationFactors() for the time being.

...