...
In all other respects, the public API of ReassignPartitionsCommand
will not be changed.
AdminClient: alterTopics()
Anchor | ||||
---|---|---|---|---|
|
AdminClient
to support the ability to reassign partitions:Code Block |
---|
/** * Request alteration of the given topics. The request can change the number of * partitions, replication factor and/or the partition assignments. * This can be a long running operation as replicas are migrated between brokers, * therefore the returned result conveys whether the alteration has been * started, not that it is complete. Progress information * can be obtained by calling the lead broker's * {@link #replicaStatus(Collection)}. */ public AlterTopicsResult alterTopics(Collection<AlteredTopic> alteredTopics) public AlterTopicsResult alterTopics(Collection<AlteredTopic> alteredTopics, AssignPartitionOptionsAlterTopicsOptions options) |
Where:
Code Block |
---|
public class AlteredTopic { public AlteredTopic(String name, int numPartitions, int replicationFactor, Map<Integer,List<Integer>> replicasAssignment) { // ... } /** The name of the topic to alter. */ public String name(); /** The new number of partitions, or -1 if the number of partitions should not be changed. */ public int numPartitions(); /** The new replication factor, or -1 if the replication factor should not be changed. */ public intshort replicationFactor(); /** * The new assignments of partition to brokers, or the empty map * if the broker should assign replicas automatically. */ Map<Integer,List<Integer>> replicasAssignment(); } public class AlterTopicsOptions { public AlterTopicsOptions validateOnly(boolean validateOnly); public boolean validateOnly(); public AlterTopicsOptions timeoutMs(long timeoutMs); public long timeoutMs(); } public class AlterTopicsResult { // package-access constructor /** A mapping of the name of a requested topic to the error for that topic. */ Map<String, KafkaFuture<Void>> values(); /** Return a future which succeeds if all the topic alterations were accepted. */ KafkaFuture<Void> all(); } |
AdminClient: replicaStatus()
Anchor | ||||
---|---|---|---|---|
|
AdminClient
to support the progress reporting functionality:...
replicaStatus()
will require Describe
on the CLUSTER
(or the TOPIC?).
Network Protocol: AlterTopicsRequest
and and AlterTopicsResponse
Anchor | ||||
---|---|---|---|---|
|
AlterTopicsRequest
will initiate the process of topic alteration/partition reassignmentNo Format |
---|
AlterTopicsRequest => [alter_topic_requests] validate_only alter_topic_requests => topic num_partitions replication_factor [partition_assignment] topic => STRING num_partitions => INT32 replication_factor => INT16 partition_assignment => partition_id brokers partition_id => INT32 brokers => [INT32] validate_only => BOOLEAN timeout => INT32 |
Where
FIELD | DESCRIPTION |
---|---|
topic | the topic name |
num_partition | the number of partitions. A |
replication_factor | the replication factor. A |
partition_id | the partition id |
brokers | the ids of the assigned brokers for this partition |
validate_only | true to just validate the request, but not actually alter the topics |
...
As currently, it will not be possible to have multiple reassignments running concurrently, hence the addition of the PARTITION_REASSIGNMENT_IN_PROGRESS
error code.
Network Protocol: ReplicaStatusRequest
and and ReplicaStatusResponse
Anchor | ||||
---|---|---|---|---|
|
ReplicaStatusRequest
requests information about the progress of a number of replicas....
The AdminClient.replicaStatus()
has to be made (an will make the underlying ReplicaStatusRequest
sent) to the leader for the given partition. This saves the need for every broker (because any broker could be the --bootstrap-server
) to have knowledge of the replication status of every replica, which would be inefficient in network IO and/or memory use. This means that, in general, the ReassignPartitionsCommand
would need to discover other brokers in the cluster (via AdminClient.describeCluster()
) and make a separate request to each broker implied by the reassignment in order to report progress.
Compatibility, Deprecation, and Migration Plan
...