Note this was initially erroneously assigned as KIP-178, which was already taken, and has been reassigned KIP-179.
...
AlterPartitionCountsRequest
andAlterPartitionCountsResponse
AlterReplicationFactorsRequest
andAlterReplicationFactorsResponse
ReassignPartitionsRequest
andReassignPartitionsResponse
ReplicaStatusRequest and ReplicaStatusResponse
The AdminClient
API will have new methods added (plus overloads for options):
alterPartitionCounts(Map<String, Integer> partitionCounts)
alterReplicationFactors(Map<String, Short> replicationFactors)
reassignPartitions(Map<TopicPartition, List<Integer>>)
replicaStatus(Collection<Replica> replicas)
The options accepted by kafka-reassign-partitions.sh
command will change:
...
No Format |
---|
Topic Partition Broker Status ------------------------------------- my_topic 0 0 In sync my_topic 0 1 Behind: 10456 messages behind asdf 0 1 Unknown topic my_topic 42 1 Unknown partition my_topic 0 42 Unknown broker my_topic 1 0 Broker does not host this partition |
The implementation of --progress
will make use of the describeReplicaDir()
method from KIP-113 to find the lag of the syncing replica.
Internally, the ReassignPartitionsCommand
will be refactored to support the above changes to the options. An interface will abstract the commands currently issued directly to zookeeper.
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Change<p>Change the partition count of the topics given as the keys of {@code counts} * to the corresponding values. Currently it is only possible to increase * the partition count. </p> */ public AlterPartitionCountsResult alterPartitionCounts(Map<String, Integer> counts, * <p>The replicas of new partitions will be allocated to the least loaded broker, * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p> */ public AlterPartitionCountsResult alterPartitionCounts(Map<String, Integer> counts, AlterPartitionCountsOptions options) public AlterPartitionCountsResult alterPartitionCounts(Map<String, Integer> counts) |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class AlterPartitionCountsOptions {
public AlterPartitionCountsOptions() { ... }
// TODO validateOnly?
public Integer timeoutMs() { ... }
public AlterPartitionCountsOptions timeoutMs(Integer timeoutMs) { ... }
}
public class AlterPartitionCountsResult {
// package access constructor
Map<String, KafkaFuture<Void>> values() { ... }
KafkaFuture<Void> all() { ... }
} |
AdminClient:
...
alterReplicationFactors()
Anchor | ||||
---|---|---|---|---|
|
kafka-topics.sh --alter --replication-factor ...
the following methods will be added to AdminClient
to support changing topics' replication factors.Code Block | ||||
---|---|---|---|---|
| ||||
/** * Change<p>Change the replication factor of the topics given as the keys of * replicationFactors to the corresponding values.</p> * * <p>New replicas will be allocated to the least loaded broker, * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p> */ AlterReplicationFactorsResult alterReplicationFactors(Map<String, Short> replicationFactors) AlterReplicationFactorsResult alterReplicationFactors(Map<String, Short> replicationFactors, AlterReplicationFactorsOptions options) |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class AlterReplicationFactorsOptions { public AlterReplicationFactorsOptions() { ... } // TODO validateOnly? public Integer timeoutMs() { ... } public AlterReplicationFactorsOptions timeoutMs(Integer timeoutMs) { ... } } public class AlterReplicationFactorsResult { // package access constructor Map<String, KafkaFuture<Void>> values() { ... } KafkaFuture<Void> all() { ... } } |
AdminClient: reassignPartitions()
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Reassign<p>Reassign the partitions given as the key of the given <code>assignments</code> to the corresponding * list of brokers. */ ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments) ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments, ReassignPartitionsOptions options) |
Where:
Code Block | ||||
---|---|---|---|---|
| ||||
class ReassignPartitionsOptions {
boolean validateOnly()
ReassignPartitionsOptions validateOnly(boolean validateOnly)
long timeoutMs()
ReassignPartitionsOptions timeoutMs(long timeoutMs)
}
class ReassignPartitionsResult {
Map<String, KafkaFuture<Void>> values();
} |
Partition reassignment is a long running operation, and the ReassignPartitionsResult
indicates only that the reassignment has been started, not that the reassignment has been completed. The new replicaStatus()
method can be used to check progress and completion of the reassignment.
TODO quotas
AdminClient: replicaStatus()
...
The first broker in each list is the one which holds the "preferred replica".</p>
*
* <p>Inter-broker reassignment causes significant inter-broker traffic and can take a long time
* in order to copy the replica data to brokers. It may be necessary to impose a quota on
* inter-broker traffic for the duration of the reassignment so that client-broker traffic is not
* adversely affected.</p>
*
* <h3>Preferred replica</h3>
* <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker
* with the preferred replica will be elected leader automatically.
* <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this
* election when <code>auto.leader.rebalance.enable=false</code>.</p>
*/
ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments)
ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments,
ReassignPartitionsOptions options)
|
Where:
Code Block | ||||
---|---|---|---|---|
| ||||
class ReassignPartitionsOptions {
boolean validateOnly()
/**
* Validate the request only: Do not actually trigger replica reassignment.
*/
ReassignPartitionsOptions validateOnly(boolean validateOnly)
long timeoutMs()
/**
* Set a timeout for the starting of the reassignment.
* Note this timeout does not include the time take to actually
* move replicas between brokers.
*/
ReassignPartitionsOptions timeoutMs(long timeoutMs)
long throttle()
/** |
Code Block |
---|
/**
* Query the replication status of the given partitions.
*/
public ReplicaStatusResult replicaStatus(Collection<TopicPartitionReplica> replicas)
public ReplicaStatusResult replicaStatus(Collection<TopicPartitionReplica> replicas, ReplicaStatusOptions options) |
Where:
Code Block |
---|
/** * Identifies a replica of a topic partition on a broker */ public class TopicPartitionReplica { public TopicPartitionReplica(String topic, int partition, short broker) { ... } public String getTopic() { ... } public int getPartition() { ... } public short getBroker() { ... } } public class ReplicaStatusOptions { public ReplicaStatusOptions() { ... } public long timeoutMs() { ... } public ReplicaStatusOptions timeoutMs(long timeoutMs) { ... } } public class ReplicaStatusResult { public KafkaFuture<Map<TopicPartitionReplica, List<ReplicaStatus>>> all() public Map<TopicPartitionReplica, KafkaFuture<ReplicaStatus>> values() } /** * Represents the replication status of a partition * on a particular broker. */ public class ReplicaStatus { /** * TheSet time (as milliseconds sincea throttle, in bytes per second, on the epoch) thatbandwidth used for * thisinter-broker status data was collected. In general this mayreplica movement. */ be some time beforeReassignPartitionsOptions the replicaStatus() request time.throttle(long throttledRateBytesPerSecond) } class ReassignPartitionsResult { */ public long statusTime() /** * The number of messages that the replica on this broker is behind * the leader. */ public long lag() } |
Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse
...
Map<String, KafkaFuture<Void>> values();
} |
Partition reassignment is a long running operation, and the ReassignPartitionsResult
indicates only that the reassignment has been started, not that the reassignment has been completed. The describeReplicaDir()
method from KIP-113 can be used to determine progress.
AdminClient: alterInterBrokerThrottle()
The following methods will be added to AdminClient
to support changing the inter-broker transfer throttle.
Code Block |
---|
/**
* Alter the throttle of inter-broker replication for the given broker to the given rate.
* @param broker The broker
* @param leaderRateBytesPerSecond The approximate maximum transfer rate, in bytes per second,
* to be used for inter-broker replication when the broker is the leader.
* @param followerRateBytesPerSecond The approximate maximum transfer rate, in bytes per second,
* to be used for inter-broker replication when the broker is the follower.
*/
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond)
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond, AlterInterBrokerThrottleOptions options) |
Where:
Code Block |
---|
public class AlterInterBrokerThrottleOptions {
public Integer timeoutMs() { ... }
public AlterInterBrokerThrottleOptions timeoutMs(Integer timeoutMs) { ... }
}
public class AlterInterBrokerThrottleResult {
public KafkaFuture<Void> all();
} |
- The throttled rate is implemented as a DynamicConfig on the broker, but this is an implementation detail so it wouldn't be appropriate to set it via
alterConfigs()
The
throttle is broker-specific and different brokers can have different rates applied.
Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse
Anchor | ||||
---|---|---|---|---|
|
AlterPartitionCountsRequest
is used to change the partition count for a batch of topics, and is the basis for the AdminClient.alterPartitionCounts()
method.No Format |
---|
AlterPartitionCountsRequest => [topic_partition_count] timeout
topic_partition_count => topic partition_count
topic => STRING
partition_count => INT32
timeout => INT32 |
Where
Field | Description |
---|---|
topic | the name of a topic |
partition_count | the new partition count |
timeout | The maximum time to await a response in ms. |
The request will require the ALTER
operation on the Topic
resource.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
Anchor | ||||
---|---|---|---|---|
|
No Format |
---|
AlterPartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
topic_partition_count_error => topic error_code error_message |
No Format |
---|
AlterPartitionCountsRequest => [topic_partition_count] timeout
topic_partition_count => topic partition_count
topic => STRING
partition_count => INT32
timeout => INT32
// TODO: validate_only? |
Where
Field | Description |
---|---|
topic | the name of a topic |
partition_count | the new partition count |
timeout | The maximum time to await a response in ms. |
The request will require the ALTER
operation on the Topic
resource.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
...
No Format |
---|
AlterPartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
topic_partition_count_error => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the name of a topic in the request |
error_code | an error code for that topic |
error_message | more detailed information about any error for that topic |
Anticipated errors:
TOPIC_AUTHORIZATION_FAILED
(29) The user lacked Alter on the topicINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_PARTITIONS
(37) If the num_partitions was invalidINVALID_REQUEST
(42) If duplicate topics appeared in the request.NONE
(0) The topic partition count was changed successfully.
Network Protocol: AlterReplicationFactorsRequest and AlterReplicationFactorsResponse
...
No Format |
---|
AlterReplicationFactorsRequest => [topic_replication_factor] timeout topic_replication_factor => topic replication_factor topic => STRING replicationerror_factorcode => INT16 timeout error_message => INT32 // TODO: validate_only? |
Where
Field | Description |
---|---|
topic | topic name |
replication_factor | the new replication factor for this topic |
timeout | The maximum time to await a response in ms. |
The request will require the ClusterAction
operation on the CLUSTER
resource, since it can require significant inter-broker communication.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
...
NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the name of a topic in the request |
error_code | an error code for that topic |
error_message | more detailed information about any error for that topic |
Anticipated errors:
TOPIC_AUTHORIZATION_FAILED
(29) The user lacked Alter on the topicINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_PARTITIONS
(37) If the num_partitions was invalidINVALID_REQUEST
(42) If duplicate topics appeared in the request.NONE
(0) The topic partition count was changed successfully.
Network Protocol: AlterReplicationFactorsRequest and AlterReplicationFactorsResponse
Anchor | ||||
---|---|---|---|---|
|
AlterReplicationFactorRequest
is used to change the replication factor for a batch of topics, and is the basis for the AdminClient.alterReplicationFactors() method.No Format |
---|
AlterReplicationFactorsRequest => [topic_replication_factor] timeout |
...
No Format |
---|
AlterReplicationFactorResponse => throttle_time_ms [topic_replication_factor_error] topic_replication_factor_error => topic error_code error_messagereplication_factor topic => STRING errorreplication_codefactor => INT16 timeout error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the name of a topic in the request |
error_code | an error code for that topic |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_REPLICATION_FACTOR
(38) If the replication_factor was invalidINVALID_REQUEST
(42) If duplicate topics appeared in the request.NONE
(0) The topic replication factor was changed successfully.
Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse
...
No Format |
---|
ReassignPartitionsRequest => [reassigned_topic] timeout validate_only
reassigned_topic => topic [reassigned_partition]
topic => STRING
reassigned_partition => partition_id [broker]
partition_id => INT32
broker => INT32
timeout => INT32
validate_only => BOOLEAN |
Where
Field | Description |
---|---|
topic | the name of a topic |
partition_id | a partition of that topic |
broker | a broker id |
timeout | The maximum time to await a response in ms. |
validate_only | when true: validate the request, but don't actually reassign the partitions |
The request requires the ClusterAction
operation on the CLUSTER
resource, since it can require significant inter-broker communication.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
...
=> INT32
// TODO: validate_only? |
Where
Field | Description |
---|---|
topic | topic name |
replication_factor | the new replication factor for this topic |
timeout | The maximum time to await a response in ms. |
The request will require the ClusterAction
operation on the CLUSTER
resource, since it can require significant inter-broker communication.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
Anchor | ||||
---|---|---|---|---|
|
No Format |
---|
AlterReplicationFactorResponse => throttle_time_ms [topic_replication_factor_error]
topic_replication_factor_error => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the name of a topic in the request |
error_code | an error code for that topic |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_REPLICATION_FACTOR
(38) If the replication_factor was invalidINVALID_REQUEST
(42) If duplicate topics appeared in the request.NONE
(0) The topic replication factor was changed successfully.
Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse
Anchor | ||||
---|---|---|---|---|
|
ReassignPartitionsRequest
initiates the movement of replicas between brokers, and is the basis of the AdminClient.reassignPartitions()
methodNo Format |
---|
ReassignPartitionsRequest => [reassigned_topic] timeout validate_only
reassigned_topic => topic [reassigned_partition]
topic => STRING
reassigned_partition => partition_id [broker]
partition_id => INT32
broker => INT32
timeout => INT32
validate_only => BOOLEAN |
Where
Field | Description |
---|---|
topic | the name of a topic |
partition_id | a partition of that topic |
broker | a broker id |
timeout | The maximum time to await a response in ms. |
validate_only | when true: validate the request, but don't actually reassign the partitions |
The request requires the ClusterAction
operation on the CLUSTER
resource, since it can require significant inter-broker communication.
The request is subject to the CreateTopicPolicy
of the broker as configured by the broker's create.topic.policy.class.name
config property. This is to ensure that the policy applies to topics modified after creation.
Anchor | ||||
---|---|---|---|---|
|
ReassignPartitionsResponse
describes which partitions in the request will be moved, and what was wrong with the request for those partitions which will not be moved.No Format |
---|
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
throttle_time_ms => INT32
reassign_partition_result => topic partition_id error_code error_message
topic => STRING
partition_id => INT32
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | a topic name from the request |
partition_id | a partition id for that topic, from the request |
error_code | an error code for that topic partition |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existUNKNOWN_MEMBER_ID
(25) If any broker ids in the partition_assignment included an unknown broker idINVALID_REQUEST
(42) If duplicate topics appeared in the requestPARTITION_REASSIGNMENT_IN_PROGRESS
(new)INVALID_REPLICA_ASSIGNMENT
(39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.NONE
(0) reassignment has started
Summary of user cases
Use case | command | AdminClient |
---|---|---|
Increase partition count | kafka-topics --alter --topic T --partitions P | alterPartitionCount() |
Change replication factor | kafka-topics --alter --topic T --replication-factor F | alterReplicationFactors() |
Change partition assignment | kafka-topics --alter --topic T --replica-assignment A | reassignPartitions() |
Change partition assignment | kafka-reassign-partitions --execute --reassignment-json-file J | reassignPartitions() |
Change partition assignment with throttle | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
|
Change throttled rate | kafka-reassign-partitions --execute --reassignment-json-file J --throttle R |
|
Check progress of a reassignment | kafka-reassign-partitions --progress --reassignment-json-file J | describeReplicaDir() (see KIP-113) |
Check result and clear throttle | kafka-reassign-partitions --verify --reassignment-json-file J |
|
No Format |
---|
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
throttle_time_ms => INT32
reassign_partition_result => topic partition_id error_code error_message
topic => STRING
partition_id => INT32
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | a topic name from the request |
partition_id | a partition id for that topic, from the request |
error_code | an error code for that topic partition |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existUNKNOWN_MEMBER_ID
(25) If any broker ids in the partition_assignment included an unknown broker idINVALID_REQUEST
(42) If duplicate topics appeared in the requestPARTITION_REASSIGNMENT_IN_PROGRESS
(new)INVALID_REPLICA_ASSIGNMENT
(39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.NONE
(0) reassignment has started
Network Protocol: ReplicaStatusRequest
and ReplicaStatusResponse
...
No Format |
---|
ReplicaStatusRequest => [replica_status_requests]
replica_status_requests => topic partition_id broker
topic => STRING
partition_id => INT32
broker => INT32 |
Where
Field | Description |
---|---|
topic | a topic name |
partition_id | a partition id of this topic |
broker | a follower broker id for this partition |
The request will require the DESCRIBE
operation on each of the Topic
resources.
...
No Format |
---|
ReplicaStatusResponse => [replica_status]
replica_status => topic partition_id broker error_code status_time lag
topic => STRING
partition_id => INT32
broker => INT32
error_code => INT16
status_time => INT64
lag => INT64 |
Where
Field | Description |
---|---|
topic | the topic name |
partition_id | the partition id of this topic |
broker | the follower broker id |
error_code | an error code |
status_time | the time the status was current |
lag | the lag (#messages) of this broker, for this partition |
Anticipated errors are:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failed. (or the TOPIC?)INVALID_TOPIC_EXCEPTION
(17) The topic is not knownINVALID_PARTITIONS
(37) Thepartion
_id
of the given topic is not validUNKNOWN_MEMBER_ID
(25) The givenbroker
id is not known.UNKNOWN_TOPIC_OR_PARTITION
(3) The givenbroker
is not a follower for the partition identified bytopic
,partition
.NONE
(0) if the status request completed normally,
Implementation
The AdminClient.replicaStatus()
will make the underlying ReplicaStatusRequest
to the leader for the given partition. This saves the need for every broker (because any broker could be the --bootstrap-server
) to have knowledge of the replication status of every replica, which would be inefficient in network IO and/or memory use.
Compatibility, Deprecation, and Migration Plan
...