Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Summary of use cases

Use casecommandAdminClient
Increase partition countkafka

...

-topics --alter --topic T --partitions PalterPartitionCount()
Change replication factorkafka-reassign-partitions --execute --reassignment-json-file JreassignPartitions()
Change partition assignmentkafka-reassign-partitions --execute --reassignment-json-file JreassignPartitions()
Change partition assignment with throttlekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

reassignPartitions(validateOnly) // check none in progress

alterInterBrokerThrottle()

alterConfigs() // (leader|follower).replication.throttled.replicas

reassignPartitions()

Change throttled ratekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

alterInterBrokerThrottle()

Check progress of a reassignmentkafka-reassign-partitions --progress --reassignment-json-file JdescribeReplicaDir() (see KIP-113)
Check result and clear throttlekafka-reassign-partitions --verify --reassignment-json-file J

reassignPartitions(validateOnly) // check none in progress

alterConfigs() // (leader|follower).replication.throttled.replicas

 kafka-reassign-partitions.sh and ReassignPartitionsCommand

The --zookeeper option will be retained and will:

  1. Cause a deprecation warning to be printed to standard error. The message will say that the --zookeeper option will be removed in a future version and that --bootstrap-server is the replacement option.
  2. Perform the reassignment via ZooKeeper, as currently.

A new --bootstrap-server option will be added and will:

  1. Perform the reassignment via the given intermediating broker.

Using both --zookeeper and --bootstrap-server in the same command will produce an error message and the tool will exit without doing the intended operation.

It is anticipated that a future version of Kafka would remove support for the --zookeeper option.

A new --progress action option will be added. This will only be supported when used with --bootstrap-server. If used with --zookeeper the command will produce an error message and the tool will exit without doing the intended operation. --progress will report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file option

For example:

No Format
# If the following command is used to start a reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --execute
       
# then the following command will print the progress of
# that reassignment, then exit immediately
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --progress

That might print something like the following:

No Format
Topic      Partition  Broker  Status
-------------------------------------
my_topic   0 

The --zookeeper option will be retained and will:

  1. Cause a deprecation warning to be printed to standard error. The message will say that the --zookeeper option will be removed in a future version and that --bootstrap-server is the replacement option.
  2. Perform the reassignment via ZooKeeper, as currently.

A new --bootstrap-server option will be added and will:

  1. Perform the reassignment via the given intermediating broker.

Using both --zookeeper and --bootstrap-server in the same command will produce an error message and the tool will exit without doing the intended operation.

It is anticipated that a future version of Kafka would remove support for the --zookeeper option.

A new --progress action option will be added. This will only be supported when used with --bootstrap-server. If used with --zookeeper the command will produce an error message and the tool will exit without doing the intended operation. --progress will report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file option

For example:

No Format
# If the following command is used to start a reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --execute
       
# then the following command will print the progress of
# that reassignment, then exit immediately
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --progress

That might print something like the following:

No Format
Topic      Partition  Broker  Status
-------------------------------------
my_topic   0          0       In sync
my_topic   0          1       Behind: 10456 messages behind
asdf       0          1       Unknown topic
my_topic   42         10       UnknownIn partitionsync
my_topic   0          421       Unknown broker
my_topicBehind: 10456 messages behind
asdf   1    0      0    1   Broker does not host this partition

The implementation of --progress will make use of the describeReplicaDir() method from KIP-113 to find the lag of the syncing replica.

Internally, the ReassignPartitionsCommand will be refactored to support the above changes to the options. An interface will abstract the commands currently issued directly to zookeeper.

There will be an implementation which makes the current calls to ZooKeeper, and another implementation which uses the AdminClient API described below.

In all other respects, the public API of ReassignPartitionsCommand will not be changed.

AdminClient: alterPartitionCount()

...

Unknown topic
my_topic   42         1       Unknown partition
my_topic   0          42      Unknown broker
my_topic   1          0       Broker does not host this partition

The implementation of --progress will make use of the describeReplicaDir() method from KIP-113 to find the lag of the syncing replica.

Internally, the ReassignPartitionsCommand will be refactored to support the above changes to the options. An interface will abstract the commands currently issued directly to zookeeper.

There will be an implementation which makes the current calls to ZooKeeper, and another implementation which uses the AdminClient API described below.

In all other respects, the public API of ReassignPartitionsCommand will not be changed.

AdminClient: alterPartitionCount()

Anchor
alterPartitionCount
alterPartitionCount
This API supports the use case of changing the partition count  via kafka-topics.sh --alter --partitions ...

Notes:

  • This API is synchronous in the sense that the client can assume that the partition count has been changed (or the request was rejected) once they have obtained the result for the topic from the AlterPartitionCountsResult.
Code Block
languagejava
linenumberstrue
/**
 * <p>Change the partition count of the topics given as the keys of {@code counts}
 * according to the corresponding values. Currently it is only possible to increase 
 * the partition count.</p>
 */
public AlterPartitionCountsResult alterPartitionCounts(Map<String, PartitionCount> counts,
                    AlterPartitionCountsOptions options)
public AlterPartitionCountsResult alterPartitionCounts(Map<String, PartitionCount> counts) 

Where:

Code Block
languagejava
linenumberstrue
/** Describes a change in a topic's partition count. */
public class PartitionCount {
    private int partitionCount;
    private List<List<Integer>> assignments;
    private PartitionCount(int partitionCount

Notes:

  • This API is synchronous in the sense that the client can assume that the partition count has been changed (or the request was rejected) once they have obtained the result for the topic from the AlterPartitionCountsResult.
Code Block
languagejava
linenumberstrue
/**
 * <p>Change the partition count of the topics given as the keys of {@code counts}
 * according to the corresponding values. Currently it is only possible to increase 
 * the partition count.</p>
 */
public AlterPartitionCountsResult alterPartitionCounts(Map<String, PartitionCount> counts,
                    AlterPartitionCountsOptions options)
public AlterPartitionCountsResult alterPartitionCounts(Map<String, PartitionCount> counts) 

Where:

Code Block
languagejava
linenumberstrue
/** Describes a change in a topic's partition count. */
public class PartitionCount {
    private int partitionCount;
    private List<List<Integer>> assignments;
    private PartitionCount(int partitionCount) { ... }

    /** 
     * Increase the partition count for a topic to the given {@code newCount}. 
     * The assignment of new replicas to brokers will be decided by the broker
     * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p>
     */
    public static PartitionCount increasePartitionCount(int newCount) { ... }

    /** 
     * <p>Increase the partition count for a topic to the given {@code newCount} 
     * assigning the new partitions according to the given {@code newAssignments}.
     * The length of {@code newAssignments} should equal {@code newCount - oldCount}, since 
     * the assignment of existing partitions are not changed. 
     * Each inner list of {@code newAssignments} should have a length equal to 
     * the topic's replication factor. 
     * The first broker id in each inner list is the "preferred replica".</p>
     *
     * <p>For example, suppose a topic currently has a replication factor of 2, and 
     * has 3 partitions. The number of partitions can be increased to 4 
     * (with broker 1 being the preferred replica for the new partition) using a 
     * {@code PartitionCount} constructed like this:</p>
     *
     * <pre><code>PartitionCount.increasePartitionCount(4, Arrays.asList(Arrays.asList(1, 2))</code></pre>
     *
     */
    public static PartitionCount increasePartitionCount(int newCount, List<List<Integer>> newAssignments) { ... }
}
    

public class AlterPartitionCountsOptions {
    public AlterPartitionCountsOptions() { ... }
    public Integer timeoutMs() { ... }
    public AlterPartitionCountsOptions timeoutMs(Integer timeoutMs) { ... }
    public boolean validateOnly() { ... }

    /** 
     * ValidateIncrease the request only: Do not actually change any partition counts.partition count for a topic to the given {@code newCount}. 
     */
 The assignment of publicnew AlterPartitionCountsOptionsreplicas validateOnly()to {brokers ... }
}
 
public class AlterPartitionCountsResult {
    // package access constructor
    Map<String, KafkaFuture<Void>> values() { ... }
    KafkaFuture<Void> all() { ... }
}

AdminClient: reassignPartitions()

...

  1. Changing the partition assignment, via kafka-reassign-partitions.sh
  2. Changing the replication factor, via kafka-reassign-partitions.sh

Notes:

  • This API is asynchronous in the sense that the client cannot assume that the request is complete (or the request was rejected) once they have obtained the result for the topic from the ReassignPartitionsResult.
  • The describeReplicaDir() method from KIP-113 can be used to determine progress.
  • When the request is complete (HOW DETERMINED?) the throttle should be removed by a call or calls to alterInterBrokerThrottle() (see below)
Code Block
languagejava
linenumberstrue
/**
 * <p>Assign the partitions given as the key of the given <code>assignments</code> to the corresponding 
 * list of brokers. This can be used to change the replica assignment or change the topic's replication factor. 
 * The first broker in each list is the one which holds the "preferred replica".</p>
 *
 * <p>To change the replication factor for a topic there must be a key for each partition in 
 * the {@code assignment}s map and the corresponding list of brokers must each be of the same 
 * length which will become the new replication factor.</p>
 *
 * <p>If only a subset of the partitions of a particular topic are present in {@code assignments} 
 * the change is taken  to be a reassignment of replicas to brokers and each list must have 
 * the same length as the current topic replication factor.</p>
 *
 * <h3>Throttling</h3>
 * <p>Inter-broker reassignment and/or increasing the replication factor causes significant 
 * inter-broker traffic and can take a long time 
 * in order to copy the replica data to brokers. It may be necessary to impose a throttle on 
 * inter-broker traffic for the duration of the reassignment so that client-broker traffic is not
 * adversely affected.</p>
 *
 * <h3>Preferred replica</h3>
 * <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker
 * with the preferred replica will be elected leader automatically. 
 * <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this 
 * election when <code>auto.leader.rebalance.enable=false</code>.</p>
 */
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments)
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments, 
                        ReassignPartitionsOptions options)

Where:

Code Block
languagejava
linenumberstrue
public class ReassignPartitionsOptions {

    public boolean validateOnly()

    /**
     * Validate the request only: Do not actually trigger replica reassignment.
     */
    public ReassignPartitionsOptions validateOnly(boolean validateOnly)

    public long timeoutMs()

    /**
     * Set a timeout for the starting of the reassignment. 
     * Note this timeout does not include the time take to actually
     * move replicas between brokers.
     */
    public ReassignPartitionsOptions timeoutMs(long timeoutMs)

    public long throttle()

    /**
     * Set a throttle, in bytes per second, on the bandwidth used for 
     * inter-broker replica movement for all movements implied by the 
     * partition reassignments.
     * Traffic between each broker (pairwise) will be throttled to approximately the given limit
     * The throttle rate should be at least 1 KB/s. 
     * By default no throttle is applied.
     */
    public ReassignPartitionsOptions throttle(long throttledRateBytesPerSecond)

 }
public class ReassignPartitionsResult {
    public Map<TopicPartition, KafkaFuture<Void>> values();
    public KafkaFuture<Void> all();
}

AdminClient: alterInterBrokerThrottle()

...

  1. A previous call to reassignPartitions()  has been made and the user wants to apply or change the throttle.
  2. A previous call to reassignPartitions() is complete and the user wants to remove the throttle.

Notes:

  • The throttle can be queried via describeConfigs()
  • The throttle cannot be set via alterConfigs() because: 1) alterConfigs doesn't support changing broker configs and 2) throttles are a special-case DynamicConfig.
  • The throttle is broker-specific and different brokers can have different rates applied.

The following methods will be added to AdminClient to support changing the inter-broker transfer throttle.

Code Block
/**
 * Alter the throttle of inter-broker replication for the given broker to the given rate.
 * @param broker The broker
 * @param leaderRateBytesPerSecond The approximate maximum transfer rate, in bytes per second, 
 *                                 to be used for inter-broker replication when the broker is the leader.
 * @param followerRateBytesPerSecond The approximate maximum transfer rate, in bytes per second, 
 *                                 to be used for inter-broker replication when the broker is the follower.
 */
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond)
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond, AlterInterBrokerThrottleOptions options)

Where:

Code Block
public class AlterInterBrokerThrottleOptions {
    public Integer timeoutMs() { ... }
    public AlterInterBrokerThrottleOptions timeoutMs(Integer timeoutMs) { ... }
}

public class AlterInterBrokerThrottleResult {
    public KafkaFuture<Void> all();
}

Notes:

  • The throttled rate is implemented as a DynamicConfig on the broker, but this is an implementation detail so it wouldn't be appropriate to set it via alterConfigs(). In any case it's currently not possible to change broker configs via alterConfigs().
  • The throttle is broker-specific and different brokers can have different rates applied.

Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse

...

No Format
AlterPartitionCountsRequest => [topic_partition_count] timeout
  topic_partition_count => topic partition_count
    topic => STRING
    partition_count => INT32
  timeout => INT32

Where

FieldDescription
topicthe name of a topic
partition_countthe new partition count
timeout

The maximum time to await a response in ms.

The request will require the ALTER operation on the Topic resource.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

...

will be decided by the broker
     * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p>
     */
    public static PartitionCount increasePartitionCount(int newCount) { ... }

    /** 
     * <p>Increase the partition count for a topic to the given {@code newCount} 
     * assigning the new partitions according to the given {@code newAssignments}.
     * The length of {@code newAssignments} should equal {@code newCount - oldCount}, since 
     * the assignment of existing partitions are not changed. 
     * Each inner list of {@code newAssignments} should have a length equal to 
     * the topic's replication factor. 
     * The first broker id in each inner list is the "preferred replica".</p>
     *
     * <p>For example, suppose a topic currently has a replication factor of 2, and 
     * has 3 partitions. The number of partitions can be increased to 4 
     * (with broker 1 being the preferred replica for the new partition) using a 
     * {@code PartitionCount} constructed like this:</p>
     *
     * <pre><code>PartitionCount.increasePartitionCount(4, Arrays.asList(Arrays.asList(1, 2))</code></pre>
     *
     */
    public static PartitionCount increasePartitionCount(int newCount, List<List<Integer>> newAssignments) { ... }
}
    

public class AlterPartitionCountsOptions {
    public AlterPartitionCountsOptions() { ... }
    public Integer timeoutMs() { ... }
    public AlterPartitionCountsOptions timeoutMs(Integer timeoutMs) { ... }
    public boolean validateOnly() { ... }
    /**
     * Validate the request only: Do not actually change any partition counts.
     */
    public AlterPartitionCountsOptions validateOnly() { ... }
}
 
public class AlterPartitionCountsResult {
    // package access constructor
    Map<String, KafkaFuture<Void>> values() { ... }
    KafkaFuture<Void> all() { ... }
}

Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse

Anchor
AlterPartitionCountsRequest
AlterPartitionCountsRequest
The AlterPartitionCountsRequest is used to change the partition count for a batch of topics, and is the basis for the  AdminClient.alterPartitionCounts() method.

The request will require the ALTER operation on the Topic resource.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

No Format
AlterPartitionCountsRequest => [topic_partition_count] timeout
  topic_partition_count => topic partition_count
    topic => STRING
    partition_count => count [assignment]
      count => INT32
      assignment => [INT32]
  timeout => INT32

Where

FieldDescription
topicthe name of a topic
countthe new partition count
assignment

a list of assigned brokers (one list for each new partition)

timeout

The maximum time to await a response in ms.

Anchor
AlterPartitionCountsResponse
AlterPartitionCountsResponse
The response provides an error code and message for each of the topics present in the request.

No Format
AlterPartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
  topic_partition_count_error => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topicthe name of a topic in the request
error_codean error code for that topic
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • TOPIC_AUTHORIZATION_FAILED (29) The user lacked Alter on the topic
  • POLICY_VIOLATION(44) The request violated the configured policy

  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the partition count was <= the current partition count for the topic.
  • INVALID_REQUEST (42) If duplicate topics appeared in the request, or the size of the partitions list did not equal the number of new partitions, or if the size of any of the lists contained in the partitions list was not equal to the topic replication factor
  • NONE (0) The topic partition count was changed successfully.

AdminClient: reassignPartitions()

Anchor
reassignPartitions
reassignPartitions
This API will support a number of use cases:

  1. Changing the partition assignment, via kafka-reassign-partitions.sh
  2. Changing the replication factor, via kafka-reassign-partitions.sh

Notes:

  • This API is asynchronous in the sense that the client cannot assume that the request is complete (or the request was rejected) once they have obtained the result for the topic from the ReassignPartitionsResult.
  • The describeReplicaDir() method from KIP-113 can be used to determine progress.
  • A call to reassignPartitions() with the validateOnly option can be used to determine whether a reassignment is currently running, and therefore whether the last reassignment has finished.

  • When the request is complete the throttle should be removed by a call or calls to alterInterBrokerThrottle()
Code Block
languagejava
linenumberstrue
/**
 * <p>Assign the partitions given as the key of the given <code>assignments</code> to the corresponding 
 * list of brokers. This can be used to change the replica assignment or change the topic's replication factor. 
 * The first broker in each list is the one which holds the "preferred replica".</p>
 *
 * <p>To change the replication factor for a topic there must be a key for each partition in 
 * the {@code assignment}s map and the corresponding list of brokers must each be of the same 
 * length which will become the new replication factor.</p>
 *
 * <p>If only a subset of the partitions of a particular topic are present in {@code assignments} 
 * the change is taken  to be a reassignment of replicas to brokers and each list must have 
 * the same length as the current topic replication factor.</p>
 *
 * <h3>Throttling</h3>
 * <p>Inter-broker reassignment and/or increasing the replication factor causes significant 
 * inter-broker traffic and can take a long time 
 * in order to copy the replica data to brokers. It may be necessary to impose a throttle on 
 * inter-broker traffic for the duration of the reassignment so that client-broker traffic is not
 * adversely affected.</p>
 *
 * <h3>Preferred replica</h3>
 * <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker
 * with the preferred replica will be elected leader automatically. 
 * <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this 
 * election when <code>auto.leader.rebalance.enable=false</code>.</p>
 */
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments)
public ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments, 
                        ReassignPartitionsOptions options)

Where:

Code Block
languagejava
linenumberstrue
public class ReassignPartitionsOptions {

    public boolean validateOnly()

    /**
     * Validate the request only: Do not actually trigger replica reassignment.
     */
    public ReassignPartitionsOptions validateOnly(boolean validateOnly)

    public long timeoutMs()

    /**
     * Set a timeout for the starting of the reassignment. 
     * Note this timeout does not include the time take to actually
     * move replicas between brokers.
     */
    public ReassignPartitionsOptions timeoutMs(long timeoutMs)

    public long throttle()

    /**
     * Set a throttle, in bytes per second, on the bandwidth used for 
     * inter-broker replica movement for all movements implied by the 
     * partition reassignments.
     * Traffic between each broker (pairwise) will be throttled to approximately the given limit
     * The throttle rate should be at least 1 KB/s. 
     * By default no throttle is applied.
     */
    public ReassignPartitionsOptions throttle(long throttledRateBytesPerSecond)

 }
public class ReassignPartitionsResult {
    public Map<TopicPartition, KafkaFuture<Void>> values();
    public KafkaFuture<Void> all();
}

Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse

Anchor
ReassignPartitionsRequest
ReassignPartitionsRequest
ReassignPartitionsRequest initiates the movement of replicas between brokers, and is the basis of the AdminClient.reassignPartitions() method

Notes:

No Format
AlterPartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
  topic_partition_count_error => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topicthe name of a topic in the request
error_codean error code for that topic
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • TOPIC_AUTHORIZATION_FAILED (29) The user lacked Alter on the topic
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the num_partitions was invalid
  • INVALID_REQUEST (42) If duplicate topics appeared in the request.
  • NONE (0) The topic partition count was changed successfully.

Network Protocol: AlterReplicationFactorsRequest and AlterReplicationFactorsResponse

...

No Format
AlterReplicationFactorsRequest => [topic_replication_factor] timeout
  topic_replication_factor => topic replication_factor
    topic => STRING
    replication_factor => INT16
  timeout => INT32
  // TODO: validate_only?

Where

FieldDescription
topictopic name
replication_factorthe new replication factor for this topic
timeoutThe maximum time to await a response in ms.

...

  • The request  requires the ClusterAction operation on the 
  • CLUSTER resource, since it can require significant inter-broker communication.
  • The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

...

...

No Format
AlterReplicationFactorResponseReassignPartitionsRequest => throttle_time_ms [topic_replication_factor_error]
  topic_replication_factor_error => topic error_code error_message[topic_reassignments] timeout validate_only
  topic_reassignments => topic [partition_reassignments]
    topic => STRING
    partition_reassignments => partition_id [broker]
    topic  partition_id => STRINGINT32
     error_code broker => INT16INT32
  timeout => INT32
  error validate_messageonly => NULLABLE_STRINGBOOLEAN

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topicthe name of a topic in the request
error_codean error code for that topic
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_REPLICATION_FACTOR (38) If the replication_factor was invalid
  • INVALID_REQUEST (42) If duplicate topics appeared in the request.
  • NONE (0) The topic replication factor was changed successfully.

Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse

...

of a topic
partition_ida partition of that topic
brokera broker id
timeoutThe maximum time to await a response in ms.
validate_onlywhen true: validate the request, but don't actually reassign the partitions

Anchor
ReassignPartitionsResponse
ReassignPartitionsResponse
ReassignPartitionsResponse describes which partitions in the request will be moved, and what was wrong with the request for those partitions which will not be moved.

No Format
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
  throttle_time_ms => INT32
  reassign_partition_result
No Format
ReassignPartitionsRequest => [topic_reassignments] timeout validate_only
  topic_reassignments => topic [partition_reassignmentserror]
    topic => STRING
    partition_reassignmentserror => partition_id [broker]error_code error_message
      partition_id => INT32
      brokererror_code => INT32INT16
   timeout => INT32
  validateerror_onlymessage => BOOLEANNULLABLE_STRING

Where

FieldDescription
topicthe name of a topic
partition_ida partition of that topic
brokera broker id
timeoutThe maximum time to await a response in ms.
validate_onlywhen true: validate the request, but don't actually reassign the partitions

The request  requires the ClusterAction operation on the CLUSTER resource, since it can require significant inter-broker communication.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

...

No Format
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
  throttle_time_ms => INT32
  reassign_partition_result => topic [partition_error]
    topic => STRING
    partition_error => partition_id error_code error_message
      partition_id => INT32
      error_code => INT16
      error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topica topic name from the request
partition_ida partition id for that topic, from the request
error_codean error code for that topic partition
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_assignment included an unknown broker id
  • INVALID_REQUEST (42) If duplicate topics appeared in the request
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) reassignment has started

Network Protocol: AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse

...

throttle_time_msduration in milliseconds for which the request was throttled
topica topic name from the request
partition_ida partition id for that topic, from the request
error_codean error code for that topic partition
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • POLICY_VIOLATION(44) The request violated the configured policy
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_reassignments included an unknown broker id
  • INVALID_REQUEST (42) If duplicate topics appeared in the request
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new) If the reassignment cannot be started because a reassignment is currently running.
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) reassignment has started

AdminClient: alterInterBrokerThrottle()

Anchor
alterInterBrokerThrottle()
alterInterBrokerThrottle()
This API will support the following use cases:

  1. A previous call to reassignPartitions()  has been made and the user wants to apply or change the throttle.
  2. A previous call to reassignPartitions() is complete and the user wants to remove the throttle.

Notes:

  • The throttle can be queried via describeConfigs()
  • The throttle cannot be set via alterConfigs() because: 1) alterConfigs doesn't support changing broker configs and 2) throttles are a special-case DynamicConfig.
  • The throttle is broker-specific and different brokers can have different rates applied.

The following methods will be added to AdminClient to support changing the inter-broker transfer throttle.

Code Block
/**
 * Alter the throttle of inter-broker replication for the given broker to the given rate.
 * @param broker The broker
 * @param leaderRateBytesPerSecond The approximate maximum transfer rate, in bytes per second, 
 *                                 to be used for inter-broker replication when the broker is the leader.
 * @param followerRateBytesPerSecond The approximate maximum transfer rate, in bytes per second, 
 *                                 to be used for inter-broker replication when the broker is the follower.
 */
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond)
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond, AlterInterBrokerThrottleOptions options)

Where:

Code Block
public class AlterInterBrokerThrottleOptions {
    public Integer timeoutMs() { ... }
    public AlterInterBrokerThrottleOptions timeoutMs(Integer timeoutMs) { ... }
}

public class AlterInterBrokerThrottleResult {
    public KafkaFuture<Void> all();
}

Network Protocol: AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse

Anchor
AlterInterBrokerThrottleRequest
AlterInterBrokerThrottleRequest
An AlterInterBrokerThrottleRequest changes the leader and follower throttle rates for inter-broker tranfers from/to the broker receiving the request.

No Format
AlterInterBrokerThrottleRequest => leader_rate follower_rate timeout
  leader_rate => INT64
  follower_rate => INT64
  timeout => INT32

Where:

FieldDescription
leader_rateThe approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the leader
follower_rateThe approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the follower
timeoutThe maximum time to await a response in ms.

The request  requires the ClusterAction operation on the CLUSTER resource, since it can affect significant inter-broker communication.

Anchor
AlterInterBrokerThrottleResponse
AlterInterBrokerThrottleResponse
An AlterInterBrokerThrottleResponse just acknowledges the success or otherwise of a AlterInterBrokerThrottleRequest

No Format
AlterInterBrokerThrottleResponse => error_code error_message throttle_time_ms
  error_code => INT16
  error_message => NULLABLE_STRING
  throttle_time_ms => INT32

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
error_codean error code for the request
error_messagemore detailed information about any error

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_REQUEST (42) Leader rate or follower rate was out of range
  • NONE (0) the request was successful
No Format
AlterInterBrokerThrottleRequest => leader_rate follower_rate timeout
  leader_rate => INT64
  follower_rate => INT64
  timeout => INT32

Where:

FieldDescription
leader_rateThe approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the leader
follower_rateThe approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the follower
timeoutThe maximum time to await a response in ms.

The request  requires the ClusterAction operation on the CLUSTER resource, since it can affect significant inter-broker communication.

...

No Format
AlterInterBrokerThrottleResponse => error_code error_message throttle_time_ms
  error_code => INT16
  error_message => NULLABLE_STRING
  throttle_time_ms => INT32

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
error_codean error code for the request
error_messagemore detailed information about any error

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_REQUEST (42) Leader rate or follower rate was out of range
  • NONE (0) the request was successful

Summary of use cases

Use casecommandAdminClient
Increase partition countkafka-topics --alter --topic T --partitions PalterPartitionCount()
Change replication factorkafka-topics --alter --topic T --replication-factor FalterReplicationFactors()
Change partition assignmentkafka-topics --alter --topic T --replica-assignment AreassignPartitions()
Change partition assignmentkafka-reassign-partitions --execute --reassignment-json-file JreassignPartitions()
Change partition assignment with throttlekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

reassignPartitions(validateOnly) // check none in progress

alterInterBrokerThrottle()

alterConfigs() // (leader|follower).replication.throttled.replicas

reassignPartitions()

Change throttled ratekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

alterInterBrokerThrottle()

Check progress of a reassignmentkafka-reassign-partitions --progress --reassignment-json-file JdescribeReplicaDir() (see KIP-113)
Check result and clear throttlekafka-reassign-partitions --verify --reassignment-json-file J

reassignPartitions(validateOnly) // check none in progress

alterConfigs() // (leader|follower).replication.throttled.replicas

...

Compatibility, Deprecation, and Migration Plan

...