Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

New network protocol APIs will be added:

The AdminClient API will have new methods added (plus overloads for options):

Support for changing DynamicConfig broker configs via AdminClient.alterConfigs() will be added

The options accepted by kafka-reassign-partitions.sh command will change:

...

Use casecommandAdminClient
Increase partition countkafka-topics --alter --topic T --partitions PalterPartitionCountincreasePartitionCount()
Change replication factorkafka-reassign-partitions --execute --reassignment-json-file JreassignPartitions()
Change partition assignmentkafka-reassign-partitions --execute --reassignment-json-file JreassignPartitions()
Change partition assignment with throttlekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

reassignPartitions(validateOnly) // check none in progressalterInterBrokerThrottle()

alterConfigs() // (leader|follower).replication.throttled.replicas

reassignPartitions()

Change throttled ratekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

alterInterBrokerThrottlealterConfigs()

Check progress of a reassignmentkafka-reassign-partitions --progress --reassignment-json-file JdescribeReplicaLogDirs() (see KIP-113)
Check result and clear throttlekafka-reassign-partitions --verify --reassignment-json-file J

reassignPartitions(validateOnly) // check none in progress

alterConfigs() // (leader|follower).replication.throttled.replicas

...

In all other respects, the public API of ReassignPartitionsCommand will not be changed.

AdminClient:

...

increasePartitionCount()

Anchor
alterPartitionCountincreasePartitionCountalterPartitionCount
increasePartitionCount
This API supports the use case of changing increasing the partition count  via kafka-topics.sh --alter --partitions ...

...

Code Block
languagejava
linenumberstrue
/**
 * <p>Change<p>Increase the partition count of the topics given as the keys of {@code counts}
 * according to the corresponding values. Currently it is only possible to increase 
 * the partition count.</p>
 */
public AlterPartitionCountsResultIncreasePartitionCountsResult alterPartitionCountsincreasePartitionCounts(Map<String, PartitionCount> counts,
                    AlterPartitionCountsOptionsIncreasePartitionCountsOptions options)
public AlterPartitionCountsResultIncreasePartitionCountsResult alterPartitionCountsincreasePartitionCounts(Map<String, PartitionCount> counts) 

...

Code Block
languagejava
linenumberstrue
/** Describes a change in a topic's partition count. */
public class PartitionCount {
    private int partitionCount;
    private List<List<Integer>> assignments;
    private PartitionCount(int partitionCount) { ... }

    /** 
     * Increase the partition count for a topic to the given {@code newCount}. 
     * The assignment of new replicas to brokers will be decided by the broker
     * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p>
     */
    public static PartitionCount increasePartitionCount(int newCount) { ... }

    /** 
     * <p>Increase the partition count for a topic to the given {@code newCount} 
     * assigning the new partitions according to the given {@code newAssignments}.
     * The length of {@code newAssignments} should equal {@code newCount - oldCount}, since 
     * the assignment of existing partitions are not changed. 
     * Each inner list of {@code newAssignments} should have a length equal to 
     * the topic's replication factor. 
     * The first broker id in each inner list is the "preferred replica".</p>
     *
     * <p>For example, suppose a topic currently has a replication factor of 2, and 
     * has 3 partitions. The number of partitions can be increased to 4 
     * (with broker 1 being the preferred replica for the new partition) using a 
     * {@code PartitionCount} constructed like this:</p>
     *
     * <pre><code>PartitionCount.increasePartitionCount(4, Arrays.asList(Arrays.asList(1, 2))</code></pre>
     *
     */
    public static PartitionCount increasePartitionCount(int newCount, List<List<Integer>> newAssignments) { ... }
}
    

public class AlterPartitionCountsOptionsIncreasePartitionCountsOptions {
    public AlterPartitionCountsOptionsIncreasePartitionCountsOptions() { ... }
    public Integer timeoutMs() { ... }
    public AlterPartitionCountsOptionsIncreasePartitionCountsOptions timeoutMs(Integer timeoutMs) { ... }
    public boolean validateOnly() { ... }
    /**
     * Validate the request only: Do not actually change any partition counts.
     */
    public AlterPartitionCountsOptionsIncreasePartitionCountsOptions validateOnly() { ... }
}
 
public class AlterPartitionCountsResultIncreasePartitionCountsResult {
    // package access constructor
    Map<String, KafkaFuture<Void>> values() { ... }
    KafkaFuture<Void> all() { ... }
}

Network Protocol:

...

IncreasePartitionCountsRequest and IncreasePartitionCountsResponse

Anchor
AlterPartitionCountsRequestIncreasePartitionCountsRequestAlterPartitionCountsRequest
IncreasePartitionCountsRequest
The AlterPartitionCountsRequest The IncreasePartitionCountsRequest is used to change increase the partition count for a batch of topics, and is the basis for the  AdminClient.alterPartitionCountsincreasePartitionCounts() method.

The request can be sent to any broker.

...

After validating the request the broker calls AdminUtils.addPartitions() which ultimately updates the topic partition assignment znode (/brokers/topics/${topic}).

No Format
AlterPartitionCountsRequestIncreasePartitionCountsRequest => [topic_partition_count] timeout
  topic_partition_count => topic partition_count
    topic => STRING
    partition_count => count [assignment]
      count => INT32
      assignment => [INT32]
  timeout => INT32

...

FieldDescription
topicthe name of a topic
countthe new partition count
assignment

a list of assigned brokers (one list for each new partition)

timeout

The maximum time to await a response in ms.

Anchor
AlterPartitionCountsResponseIncreasePartitionCountsResponseAlterPartitionCountsResponse
IncreasePartitionCountsResponse
The response provides an error code and message for each of the topics present in the request.

No Format
AlterPartitionCountsResponseIncreasePartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
  topic_partition_count_error => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

...

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • POLICY_VIOLATION(44) The request violated the configured policy
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_reassignments included an unknown broker id
  • INVALID_REQUEST (42) If duplicate topics appeared in the request
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new) If the reassignment cannot be started because a reassignment is currently running (i.e. the /admin/reassign_partitions znode exists)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) reassignment has started

AdminClient:

...

alterConfigs()

Anchor
alterInterBrokerThrottlealterConfigs()alterInterBrokerThrottle
alterConfigs()
This API will support the following use cases:

  1. A previous call to reassignPartitions()  has been made and the user wants to apply or change the throttle.
  2. A previous call to reassignPartitions() is complete and the user wants to remove the throttle.

Notes:

  • The throttle can be queried via describeConfigs()
  • The throttle cannot be set via alterConfigs() because: 1) alterConfigs doesn't support changing broker configs and 2) throttles are a special-case DynamicConfig.
  • The throttle is broker-specific and different brokers can have different rates applied.

The following methods will be added to AdminClient to support changing the inter-broker transfer throttle.

Code Block
/**
 * Alter the throttle of inter-broker replication for the given broker to the given rate.
 * @param broker The broker
 * @param leaderRateBytesPerSecond The approximate maximum transfer rate, in bytes per second, 
 *                                 to be used for inter-broker replication when the broker is the leader.
 * @param followerRateBytesPerSecond The approximate maximum transfer rate, in bytes per second, 
 *                                 to be used for inter-broker replication when the broker is the follower.
 */
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond)
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond, AlterInterBrokerThrottleOptions options)

Where:

Code Block
public class AlterInterBrokerThrottleOptions {
    public Integer timeoutMs() { ... }
    public AlterInterBrokerThrottleOptions timeoutMs(Integer timeoutMs) { ... }
}

public class AlterInterBrokerThrottleResult {
    public KafkaFuture<Void> all();
}

Network Protocol: AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse

...

No Format
AlterInterBrokerThrottleRequest => leader_rate follower_rate timeout
  leader_rate => INT64
  follower_rate => INT64
  timeout => INT32

Where:

FieldDescription
leader_rateThe approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the leader
follower_rateThe approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the follower
timeoutThe maximum time to await a response in ms.

The request  requires the Alter operation on the CLUSTER resource, since it can affect significant inter-broker communication.

...

No Format
AlterInterBrokerThrottleResponse => error_code error_message throttle_time_ms
  error_code => INT16
  error_message => NULLABLE_STRING
  throttle_time_ms => INT32

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
error_codean error code for the request
error_messagemore detailed information about any error

Anticipated errors:

...

already exists, but doesn't currently support changing broker configs. Inter-broker throttling is implemented via DynamicConfigs on the broker:

  • leader.replication.throttled.rate
  • follower.replication.throttled.rate

DynamicConfigs are configs "which have no physical manifestation in the server.properties and can only be set dynamically".

And via topic configs:

  • leader.replication.throttled.replicas

  • follower.replication.throttled.replicas

The alterConfigs() API will be changed to allow altering DynamicConfigs in order to support  the following use cases:

  1. A previous call to reassignPartitions()  has been made and the user wants to apply or change the throttle.
  2. A previous call to reassignPartitions() is complete and the user wants to remove the throttle.

 This KIP does not propose to support altering broker configs that are not DynamicConfigs.

This will involve changing the API doc.

Network Protocol: AlterConfigsRequest and AlterConfigsResponse

This protocol already exists, but support will be added for altering broker DynamicConfigs.

...

Compatibility, Deprecation, and Migration Plan

...