Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Minor corrections

...

In all other respects, the public API of ReassignPartitionsCommand will not be changed.

AdminClient: alterTopics()

Anchor
alterTopics
alterTopics
The following methods will be added to AdminClient to support the ability to reassign partitions:

Code Block
/**
 * Request alteration of the given topics. The request can change the number of 
 * partitions, replication factor and/or the partition assignments. 
 * This can be a long running operation as replicas are migrated between brokers, 
 * therefore the returned result conveys whether the alteration has been 
 * started, not that it is complete. Progress information 
 * can be obtained by calling the lead broker's 
 * {@link #replicaStatus(Collection)}.
 */
public AlterTopicsResult alterTopics(Collection<AlteredTopic> alteredTopics)
public AlterTopicsResult alterTopics(Collection<AlteredTopic> alteredTopics,  AssignPartitionOptionsAlterTopicsOptions options)

Where:

Code Block
public class AlteredTopic {
    public AlteredTopic(String name, int numPartitions, int replicationFactor, Map<Integer,List<Integer>> replicasAssignment) {
        // ...
    }
    /** The name of the topic to alter. */
    public String name();
    /** The new number of partitions, or -1 if the number of partitions should not be changed. */
    public int numPartitions();
    /** The new replication factor, or -1 if the replication factor should not be changed. */
    public intshort replicationFactor();
    /** 
     * The new assignments of partition to brokers, or the empty map 
     * if the broker should assign replicas automatically. 
     */
    Map<Integer,List<Integer>> replicasAssignment();
}

public class AlterTopicsOptions {
    public AlterTopicsOptions validateOnly(boolean validateOnly);
    public boolean validateOnly();
    public AlterTopicsOptions timeoutMs(long timeoutMs);
	public long timeoutMs();
 }

public class AlterTopicsResult {
    // package-access constructor
    /** A mapping of the name of a requested topic to the error for that topic. */
    Map<String, KafkaFuture<Void>> values(); 
    /** Return a future which succeeds if all the topic alterations were accepted. */
	KafkaFuture<Void> all();
}

AdminClient: replicaStatus()

Anchor
replicaStatus
replicaStatus
The following methods will be added to AdminClient to support the progress reporting functionality:

...

replicaStatus() will require Describe on the CLUSTER (or the TOPIC?).

Network Protocol: AlterTopicsRequest and and AlterTopicsResponse

Anchor
AlterTopicsRequest
AlterTopicsRequest
An AlterTopicsRequest will initiate the process of topic alteration/partition reassignment

No Format
AlterTopicsRequest => [alter_topic_requests] validate_only
  alter_topic_requests => topic num_partitions replication_factor [partition_assignment]
    topic => STRING
    num_partitions => INT32
    replication_factor => INT16
    partition_assignment => partition_id brokers
      partition_id => INT32
      brokers => [INT32]
  validate_only => BOOLEAN
  timeout => INT32

Where

FIELDDESCRIPTION
topic

the topic name

num_partition

the number of partitions. A num_partitions of -1 that would mean "no change"

replication_factor

the replication factor. A  replication_factor of -1 would mean "no change"

partition_id

the partition id

brokers

the ids of the assigned brokers for this partition

validate_only

true to just validate the request, but not actually alter the topics

...

As currently, it will not be possible to have multiple reassignments running concurrently, hence the addition of the PARTITION_REASSIGNMENT_IN_PROGRESS error code.

Network Protocol: ReplicaStatusRequest and and ReplicaStatusResponse

Anchor
ReplicaStatusRequest
ReplicaStatusRequest
ReplicaStatusRequest requests information about the progress of a number of replicas.

...

The AdminClient.replicaStatus() has to be made (an will make the underlying ReplicaStatusRequest sent) to the leader for the given partition. This saves the need for every broker (because any broker could be the --bootstrap-server ) to have knowledge of the replication status of every replica, which would be inefficient in network IO and/or memory use. This means that, in general, the ReassignPartitionsCommand would need to discover other brokers in the cluster (via AdminClient.describeCluster()) and make a separate request to each broker implied by the reassignment in order to report progress.

Compatibility, Deprecation, and Migration Plan

...