Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Remove alterReplicationFactor. Change alterPartitionCount

...

...

...

Anchor
alterPartitionCount
alterPartitionCount
To support This API supports the use case of changing the partition count  via kafka-topics.sh --alter --partitions ... the following methods will be added to AdminClient to support changing topics' partition counts

Notes:

  • This API is synchronous in the sense that the client can assume that the partition count has been changed (or the request was rejected) once they have obtained the result for the topic from the AlterPartitionCountsResult.
Code Block
languagejava
linenumberstrue
/**
 * <p>Change the partition count of the topics given as the keys of {@code counts}
 * according to the corresponding values. Currently it is only possible to increase 
 * the partition count.</p>
 *
 * <p>The replicas of new partitions will be allocated to the least loaded broker, 
 * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p>
 */
public AlterPartitionCountsResult alterPartitionCounts(Map<String, Integer>PartitionCount> counts,
                    AlterPartitionCountsOptions options)
public AlterPartitionCountsResult alterPartitionCounts(Map<String, Integer>PartitionCount> counts) 

Where:

Code Block
languagejava
linenumberstrue
public/** classDescribes AlterPartitionCountsOptionsa {
change in a topic's partition count. */
public AlterPartitionCountsOptions()class PartitionCount {
   ... }
private int partitionCount;
  public Integer timeoutMs() {private ... }List<List<Integer>> assignments;
    publicprivate AlterPartitionCountsOptions timeoutMsPartitionCount(Integerint timeoutMspartitionCount) { ... }
}
 
public  class AlterPartitionCountsResult/** {
    // package* accessIncrease constructor
the partition count for Map<String, KafkaFuture<Void>> values() { ... }
    KafkaFuture<Void> all() { ... }
}

AdminClient: alterReplicationFactors()

...

a topic to the given {@code newCount}. 
     * The assignment of new replicas to brokers will be decided by the broker
     * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p>
     */
    public static PartitionCount increasePartitionCount(int newCount) { ... }

    /** 
     * <p>Increase the partition count for a topic to the given {@code newCount} 
     * assigning the new partitions according to the given {@code newAssignments}.
     * The length of {@code newAssignments} should equal {@code newCount - oldCount}, since 
     * the assignment of existing partitions are not changed. 
     * Each inner list of {@code newAssignments} should have a length equal to 
     * the topic's replication factor. 
     * The first broker id in each inner list is the "preferred replica".</p>
     *
     * <p>For example, suppose a topic currently has a replication factor of 2, and 
     * has 3 partitions. The number of partitions can be increased to 4 
     * (with broker 1 being the preferred replica for the new partition) using a 
     * {@code PartitionCount} constructed like this:</p>
     *
     * <pre><code>PartitionCount.increasePartitionCount(4, Arrays.asList(Arrays.asList(1, 2))</code></pre>
     *
     */
    public static PartitionCount increasePartitionCount(int newCount, List<List<Integer>> newAssignments) { ... }
}
    

public class AlterPartitionCountsOptions {
    public AlterPartitionCountsOptions() { ... }
    public Integer timeoutMs() { ... }
    public AlterPartitionCountsOptions timeoutMs(Integer timeoutMs) { ... }
    public boolean validateOnly() { ... }
    /**
     * Validate the request only: Do not actually change any partition counts.
     */
    public AlterPartitionCountsOptions validateOnly() { ... }
}
 
public class AlterPartitionCountsResult {
    // package access constructor
    Map<String, KafkaFuture<Void>> values() { ... }
    KafkaFuture<Void> all() { ... }
}

AdminClient: reassignPartitions()

Anchor
reassignPartitions
reassignPartitions
This API will support a number of use cases:

  1. Changing the partition assignment, via kafka-reassign-partitions.sh
  2. Changing the replication factor, via kafka-reassign-partitions.sh

Notes:

  • This API is asynchronous in the sense that the client cannot assume that the request is complete (or the request was rejected) once they have obtained the result for the topic from the ReassignPartitionsResult.
  • The describeReplicaDir() method from KIP-113 can be used to determine progress.
  • When the request is complete (HOW DETERMINED?) the throttle should be removed by a call or calls to alterInterBrokerThrottle() (see below)
Code Block
languagejava
linenumberstrue
/**
 * <p>Assign the partitions given as the key of the given <code>assignments</code> to the corresponding 
 * list of brokers. This can be used to change the replica assignment or change the topic's replication factor. 
 * The first broker in each list is the one which holds the "preferred replica".</p>
 *
 * <p>Inter-broker reassignment causes significant inter-broker traffic and can take a long time 
 * in order to copy the replica data to brokers. It may be necessary to impose a quota on 
 * inter-broker traffic for the duration of the reassignment so that client-broker traffic is not
 * adversely affected.</p>
 *
 * <h3>Preferred replica</h3>
 * <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker
 * with the preferred replica will be elected leader automatically. 
 * <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this 
 * election when <code>auto.leader.rebalance.enable=false</code>.</p>
 */
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments)
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments, 
                        ReassignPartitionsOptions options)

Where:

Code Block
languagejava
linenumberstrue
public class ReassignPartitionsOptions {

    public boolean validateOnly()

    /**
     * Validate the request only: Do not actually trigger replica reassignment.
     */
    public ReassignPartitionsOptions validateOnly(boolean validateOnly)

    public long timeoutMs()

    /**
     * Set a timeout for the starting of the reassignment. 
     * Note this timeout does not include the time take to actually
     * move replicas between brokers
Code Block
languagejava
linenumberstrue
/**
 * <p>Change the replication factor of the topics given as the keys of 
 * replicationFactors to the corresponding values.</p>
 *
 * <p>New replicas will be allocated to the least loaded broker, 
 * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p>
 */
AlterReplicationFactorsResult alterReplicationFactors(Map<String, Short> replicationFactors)
AlterReplicationFactorsResult alterReplicationFactors(Map<String, Short> replicationFactors, 
                        AlterReplicationFactorsOptions options)

Where:

Code Block
languagejava
linenumberstrue
public class AlterReplicationFactorsOptions {
    public AlterReplicationFactorsOptions() { ... }
    public Integer timeoutMs() { ... }
    public AlterReplicationFactorsOptions timeoutMs(Integer timeoutMs) { ... }
}
public class AlterReplicationFactorsResult {
    // package access constructor
    Map<String, KafkaFuture<Void>> values() { ... }
    KafkaFuture<Void> all() { ... }
}

AdminClient: reassignPartitions()

...

Code Block
languagejava
linenumberstrue
/**
 * <p>Reassign the partitions given as the key of the given <code>assignments</code> to the corresponding 
 * list of brokers. The first broker in each list is the one which holds the "preferred replica".</p>
 *
 * <p>Inter-broker reassignment causes significant inter-broker traffic and can take a long time 
 * in order to copy the replica data to brokers. It may be necessary to impose a quota on 
 * inter-broker traffic for the duration of the reassignment so that client-broker traffic is not
 * adversely affected.</p>
 *
 * <h3>Preferred replica</h3>
 * <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker
 * with the preferred replica will be elected leader automatically. 
 * <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this 
 * election when <code>auto.leader.rebalance.enable=false</code>.</p>
 */
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments)
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments, 
                        ReassignPartitionsOptions options)

Where:

Code Block
languagejava
linenumberstrue
public class ReassignPartitionsOptions {

    public boolean validateOnly()

    /**
     * Validate the request only: Do not actually trigger replica reassignment.
     */
    public ReassignPartitionsOptions validateOnlytimeoutMs(booleanlong validateOnlytimeoutMs)

     public long timeoutMsthrottle()

    /**
     * Set a throttle, timeoutin forbytes theper startingsecond, ofon the reassignment. bandwidth used for 
     * Noteinter-broker thisreplica timeoutmovement doesfor notall includemovements theimplied timeby take tothe actually
     * move replicas betweenpartition brokersreassignments.
     */
 Traffic between  publiceach ReassignPartitionsOptionsbroker timeoutMs(long timeoutMspairwise)

 will be throttled publicto long throttle()

    /**approximately the given limit
     * SetThe athrottle throttle,rate inshould bytesbe perat second,least on the bandwidth used for 
     * inter-broker replica movement1 KB/s. 
     * By default no throttle is applied.
      */
    public ReassignPartitionsOptions throttle(long throttledRateBytesPerSecond)

 }
public class ReassignPartitionsResult {
    public Map<TopicPartition, KafkaFuture<Void>> values();
    public KafkaFuture<Void> all();
}

Partition reassignment is a long running operation, and the ReassignPartitionsResult indicates only that the reassignment has been started, not that the reassignment has been completed. The describeReplicaDir() method from KIP-113 can be used to determine progress.

AdminClient: alterInterBrokerThrottle()

...

AdminClient: alterInterBrokerThrottle()

Anchor
alterInterBrokerThrottle()
alterInterBrokerThrottle()
This API will support the following use cases:

  1. A previous call to reassignPartitions()  has been made and the user wants to apply or change the throttle.
  2. A previous call to reassignPartitions() is complete and the user wants to remove the throttle.

Notes:

  • The throttle can be queried via describeConfigs()
  • The throttle cannot be set via alterConfigs() because: 1) alterConfigs doesn't support changing broker configs and 2) throttles are a special-case DynamicConfig.
  • The throttle is broker-specific and different brokers can have different rates applied.

...

The following methods will be added to AdminClient to support changing the inter-broker transfer throttle.

...