Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Anchor
AlterPartitionCountsRequest
AlterPartitionCountsRequest
The AlterPartitionCountsRequest is used to change the partition count for a batch of topics, and is the basis for the  AdminClient.alterPartitionCounts() method.

The request can be sent to any broker.

The request will require the ALTER operation on the Topic resource.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

After validating the request the broker calls AdminUtils.addPartitions() which ultimately updates the topic partition assignment znode (/brokers/topics/${topic}).

No Format
AlterPartitionCountsRequest => [topic_partition_count] timeout
  topic_partition_count => topic partition_count
    topic => STRING
    partition_count => count [assignment]
      count => INT32
      assignment => [INT32]
  timeout => INT32

...

  • This API is asynchronous in the sense that the client cannot assume that the request is complete (or the request was rejected) once they have obtained the result for the topic from the ReassignPartitionsResult.
  • The describeReplicaLogDirs() method from KIP-113 can be used to determine progress.
  • A call to reassignPartitions() with the validateOnly option can be used to determine whether a reassignment is currently running, and therefore whether the last reassignment has finished.

  • When the request is complete the throttle should be removed by a call or

    The API doesn't directly support setting a throttle itself. A prior set of calls to alterInterBrokerThrottle() can be used to set a throttle.

Code Block
languagejava
linenumberstrue
/**
 * <p>Assign the partitions given as the key of the given <code>assignments</code> to the corresponding 
 * list of brokers. This can be used to change the replica assignment or change the topic's replication factor. 
 * The first broker in each list is the one which holds the "preferred replica".</p>
 *
 * <p>To change the replication factor for a topic there must be a key for each partition in 
 * the {@code assignment}s map and the corresponding list of brokers must each be of the same 
 * length which will become the new replication factor.</p>
 *
 * <p>If only a subset of the partitions of a particular topic are present in {@code assignments} 
 * the change is taken  to be a reassignment of replicas to brokers and each list must have 
 * the same length as the current topic replication factor.</p>
 *
 * <h3>Throttling</h3>
 * <p>Inter-broker reassignment and/or increasing the replication factor causes significant 
 * inter-broker traffic and can take a long time 
 * in order to copy the replica data to brokers. It may be necessary to impose a throttle on 
 * inter-broker traffic for the duration of the reassignment so that client-broker traffic is not
 * adversely affected.</p>
 *
 * <h3>Preferred replica</h3>
 * <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker
 * with the preferred replica will be elected leader automatically. 
 * <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this 
 * election when <code>auto.leader.rebalance.enable=false</code>.</p>
 */
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments)
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments, 
                        ReassignPartitionsOptions options)

...

Code Block
languagejava
linenumberstrue
public class ReassignPartitionsOptions {

    public boolean validateOnly()

    /**
     * Validate the request only: Do not actually trigger replica reassignment.
     */
    public ReassignPartitionsOptions validateOnly(boolean validateOnly)

    public long timeoutMs()

    /**
     * Set a timeout for the starting of the reassignment. 
     * Note this timeout does not include the time take to actually
     * move replicas between brokers.
     */
    public ReassignPartitionsOptions timeoutMs(long timeoutMs)

    public long throttle()

    /**
     * Set a throttle, in bytes per second, on the bandwidth used for 
     * inter-broker replica movement for all movements implied by the 
     * partition reassignments.
     * Traffic between each broker (pairwise) will be throttled to approximately the given limit
     * The throttle rate should be at least 1 KB/s. 
     * By default no throttle is applied.
     */
    public ReassignPartitionsOptions throttle(long throttledRateBytesPerSecond)

 
 }
public class ReassignPartitionsResult {
    public Map<TopicPartition, KafkaFuture<Void>> values();
    public KafkaFuture<Void> all();
}

...

Anchor
ReassignPartitionsRequest
ReassignPartitionsRequest
ReassignPartitionsRequest initiates the movement of replicas between brokers, and is the basis of the AdminClient.reassignPartitions() method

Notes:

  • The request can be sent to any broker.
  • The request  requires the ClusterAction operation on the CLUSTER resource, since it can require significant inter-broker communication.
  • The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.
  • After validating the request the broker writes reassignment JSON to the /admin/reassign_partitions znode
No Format
ReassignPartitionsRequest => [topic_reassignments] timeout validate_only
  topic_reassignments => topic [partition_reassignments]
    topic => STRING
    partition_reassignments => partition_id [broker]
      partition_id => INT32
      broker => INT32
  timeout => INT32
  validate_only => BOOLEAN

...

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • POLICY_VIOLATION(44) The request violated the configured policy
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_reassignments included an unknown broker id
  • INVALID_REQUEST (42) If duplicate topics appeared in the request
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new) If the reassignment cannot be started because a reassignment is currently running (i.e. the /admin/reassign_partitions znode exists)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) reassignment has started

...