Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Withdrawn

...

Table of Contents

Status

Current state:  Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]Withdrawn

Discussion thread: here (when initially misnumbered as KIP-178) and here (when assigned KIP-179)

...

  • The AdminClient API currently lacks any functionality for reassigning partitions: Users have to use the kafka-reassign-partitions.sh tool (ReassignPartitionsCommand) which talks directly to ZooKeeper. This prevents the tool being used in deployments where only the brokers are exposed to clients (i.e. where the zookeeper servers are intentionally not exposed). In addition, there is a general push to refactor/rewrite/replace tools which need ZooKeeper access with equivalents which use the AdminClient API.
  • ReassignPartitionsCommand currently has no proper facility to report progress of a reassignment;  While --verify can be used periodically to check whether the request assignments have been achieved the tool provides no means of knowing how quickly new replicas are catching up. It would be useful if the tool could report progress better.
  • ReassignPartitionsCommand, when used with a replication throttle, requires a --verify invocation when the reassignment has finished in order to remove the throttledthrottle. Therefor So there exists the possibility that throttles are not removed after reassignment, with negative consequences for the performance of the cluster. It would be better if throttles could be removed automatically.

Public Interfaces

The AdminClient API will have new methods added (plus overloads for options):

The options for for reassignPartitions() will support setting a throttle, and a flag for its automatic removal at the end of the reassignment. Likewise the options for changing the throttled rates and replicas will include the ability to have the throttles automatically removed.

...

The options accepted by kafka-reassign-partitions.sh command will change:

...

When run with --bootstrap-server it will no longer be necessary to run run kafka-reassign-partitions.sh --verify to remove a throttle: This will be done automatically.

...

Use casecommandAdminClient
Change replication factorkafka-reassign-partitions --execute --reassignment-json-file JreassignPartitions()
Change partition assignmentkafka-reassign-partitions --execute --reassignment-json-file JreassignPartitions()
Change partition assignment with throttlekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

reassignPartitions() // with throttle option

Change throttled ratekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

alterInterbrokerThrottledRates()

alterInterbrokerThrottledReplicas()

// TODO how to do this conveniently?

Check progress of a reassignmentkafka-reassign-partitions --progress --reassignment-json-file JdescribeReplicaLogDirs() (see KIP-113)
Check result and clear throttlekafka-reassign-partitions --verify --reassignment-json-file J

reassignPartitions(validateOnly)

// check TODO checks none in progress, doesn't confirm states match

 

 kafka-reassign-partitions.sh and ReassignPartitionsCommand

...

  1. Perform the reassignment via the AdminClient API (described below) using the given broker(s) as bootstrap brokers.
  2. When used with with --execute and --throttle, the throttle will be an auto-removed one.

...

Anchor
reassignPartitions
reassignPartitions
This API will support a number of use cases:

  1. Changing the partition assignment, via kafka-reassign-partitions.sh
  2. Changing the replication factor, via kafka-reassign-partitions.sh

NotesNotes:

  • This API is asynchronous in the sense that the client cannot assume that the request is complete (or the request was rejected) once they have obtained the result for the topic from the ReassignPartitionsResult.
  • The describeReplicaLogDirs() method from KIP-113 can be used to determine progress.
  • A call to reassignPartitions() with the validateOnly option can be used to determine whether a reassignment is currently running, and therefore whether the last reassignment has finished.

...

Code Block
languagejava
linenumberstrue
public class ReassignPartitionsOptions extends AbstractOptions<ReassignPartitionsOptions> {

    // Note timeoutMs() inherited from AbstractOptions

    public boolean validateOnly()

    /**
     * Validate the request only: Do not actually trigger replica reassignment.
     */
    public ReassignPartitionsOptions validateOnly(boolean validateOnly)

    public long throttle() {
        return throttle;
    }

    /**
     * <p>Set the throttle rate and throttled replicas for the reassignments.
     * The given throttle is in bytes/second and should be at least 1 KB/s.
     * Interbroker replication traffic will be throttled to approximately the given value.
     * Use Long.MAX_VALUE if the reassignment should not be throttled.</p>
     *
     * <p>A positive throttle is equivalent to setting:</p>
     * <ul>
     *     <li>The leader and follower throttled rates to the given value given by throttle.</li>
     *     <li>The leader throttled replicas of each topic in the request to include the existing brokers having
     *     replicas of the partitions in the request.</li>
     *     <li>The follower throttled replicas of each topic in the request to include the new brokers
     *     for each partition in that topic.</li>
     * </ul>
     *
     * <p>The value of {@link #autoRemoveThrottle()} will determine whether these
     * throttles will be removed automatically when the reassignment completes.</p>
     *
     * @see AdminClient#alterInterbrokerThrottledRate(int, long, long)
     * @see AdminClient#alterInterbrokerThrottledReplicas(Map)
     */
    public ReassignPartitionsOptions throttle(long throttle) { ... }

    public boolean autoRemoveThrottle() { ... }

    /**
     * True to automatically remove the throttle at the end of the current reassignment.
     */
    public ReassignPartitionsOptions autoRemoveThrottle(boolean autoRemoveThrottle) { ... }
}

 }
public class ReassignPartitionsResult {
    public Map<TopicPartition, KafkaFuture<Void>> values();
    public KafkaFuture<Void> all();
}

...

  • The request must be sent to the controller.
  • The request  requires the Alter operation on the CLUSTER resource, since it can require significant inter-broker communication.
  • The request will be subject to a policy, as described in KIP-201.
No Format
ReassignPartitionsRequest => [topic_reassignments] timeout validate_only throttle remove_throttle
  topic_reassignments => topic [partition_reassignments]
    topic => STRING
    partition_reassignments => partition_id [broker]
      partition_id => INT32
      broker => INT32
  timeout => INT32
  validate_only => BOOLEAN
  throttle => INT64
  remove_throttle => BOOLEAN

...

FieldDescription
topicthe name of a topic
partition_ida partition of that topic
brokera broker id
timeoutThe maximum time to await a response in ms.
validate_onlywhen true: validate the request, but don't actually reassign the partitions

Algorithm:

  1. The controller validates the request request against configured authz, policy etc.
  2. The controller computes set of topics in the request, and writes this as JSON to the new new /admin/throttled_replicas_removal znode
  3. The controller then updates the existing existing leader.replication.throttled.replicas and and follower.replication.throttled.replicas properties of each topic config.
  4. The controller computes the union of 1) The the brokers currently hosting replicas of the topic partitions in the request 2) the brokers assigned to host topic partitions in the request, and write this as JSON to the new new /admin/throttled_rates_removal znode.
  5. The controller then updates the existing existing leader.replication.throttled.rates and and follower.replication.throttled.rates properties of each broker config.
  6. The controller writes reassignment JSON to the /admin/reassign_partitions znode

The intent behind this algorithm is that should the controller crash during the update, the reassignment won't have started and the throttles will be removed on controller failover.

The broker will use the same algorithm for determing the values of the topic and broker configs as is currently used in the ReassignPartitionsCommand.

Anchor
ReassignPartitionsResponse
ReassignPartitionsResponse
ReassignPartitionsResponse describes which partitions in the request AnchorReassignPartitionsResponseReassignPartitionsResponseReassignPartitionsResponse describes which partitions in the request will be moved, and what was wrong with the request for those partitions which will not be moved.

...

Code Block
languagejava
linenumberstrue
    /** 
     * Change the rate at which interbroker replication is throttled, replacing existing throttled rates. 
 * For each broker *in Thethe given {@code leaderRaterates}, is the {@code leaderRate} of the corresponding 
 * {@code ThrottledRate} is the throttled rate when the broker is acting as leader.
 and 
   * Thethe given {@code followerRate} is the throttled rate when the broker is acting as follower.
      * For the throttled rates to take effect, the given broker must also be present in the
     * list of throttled replicas, which can be set by {@link #alterInterbrokerThrottledReplicas()}.
     * The throttledthrottle rates will be automatically removed at the end of the current reassignment.
     */
    AlterInterbrokerThrottledRateResult alterInterbrokerThrottledRate(Map<Integer, ThrottledRate>, AlterInterbrokerThrottledRateOptions options)
    // HOW do I get the current throttled rate?
    class AlterInterbrokerThrottledRateOptions {
  ,
 * unless overridden in the given options.
 *
 * The current rates can be obtained from {@link #describeConfigs(Collection)}.
 *
 * @param rates Map from broker id to the throttled rates for that broker.
 * @param options The options.
 */
public abstract AlterInterbrokerThrottledRateResult alterInterbrokerThrottledRate(
      boolean autoRemoveThrottle()
 Map<Integer, ThrottledRate> rates, 
    ReassignPartitionsOptions autoRemoveThrottle(boolean)
   AlterInterbrokerThrottledRateOptions }options);

Where:

Code Block
/**
 * The throttled rate for interbroker replication on a particular broker.
 */
public class ThrottledRate {
    public ThrottledRate(long leaderRate, long followerRate) { ... }
  long leaderRate() { ... }
  long followerRate() { ... }

}

public class AlterInterbrokerThrottledRateOptions extends AbstractOptions<AlterInterbrokerThrottledRateOptions> {

  /**
     * The throttled rate when the broker is acting as leader.
    public boolean autoRemoveThrottle*/
    long leaderRate() { ... }

    /**
     * TrueThe tothrottled automaticallyrate removewhen the throttlebroker atis theacting end of the current reassignmentas follower.
     */
    long public AlterInterbrokerThrottledRateOptions autoRemoveThrottle(boolean autoRemoveThrottlefollowerRate() { ... }
}

public class AlterInterbrokerThrottledRateResultAlterInterbrokerThrottledRateOptions {
extends    // package-access ctorAbstractOptions<AlterInterbrokerThrottledRateOptions> {

    public Map<Integer, KafkaFuture<Void>> valuesboolean autoRemoveThrottle() { ... }

    public/**
  KafkaFuture<Void> all() { ... }
}

Network API: AlterInterbrokerThrottledRatesRequest and AlterInterbrokerThrottledRatesResponse

No Format
AlterInterbrokerThrottledRatesRequest => [broker_throttles] remove_throttle timeout validate_only
  broker_throttles => broker_id leader_rate follower_rate
    broker_id => INT32* True to automatically remove the throttle at the end of the current reassignment.
     */
    leader_rate => INT64
    follower_rate => INT64
  timeout => INT32
  validate_only => BOOLEAN
  remove_throttle => BOOLEAN

Algorithm:

public AlterInterbrokerThrottledRateOptions autoRemoveThrottle(boolean autoRemoveThrottle) { ... }
}

public class AlterInterbrokerThrottledRateResult {
    // package-access ctor

    public Map<Integer, KafkaFuture<Void>> values() { ... }

    public KafkaFuture<Void> all() { ... }
}

Network API: AlterInterbrokerThrottledRatesRequest and AlterInterbrokerThrottledRatesResponse

No Format
AlterInterbrokerThrottledRatesRequest => [broker_throttles] remove_throttle timeout validate_only
  broker_throttles => broker_id leader_rate follower_rate
    broker_id => INT32
    leader_rate => INT64
    follower_rate => INT64
  timeout => INT32
  validate_only => BOOLEAN
  remove_throttle => BOOLEAN

Algorithm:

  1. The The controller validates the brokers and rates in the request .
     and that the principal has Alter operation on the CLUSTER resource.
  2. The controller gets the current value of the /admin/throttled_rates_removal znode, forms the union of those brokers with those in the request and updates the /admin/throttled_rates_removal znode with JSON representation of this union
  3. The controller then subtracts the brokers in the request from the current brokers and removes the leader.replication.throttled.rates and follower.replication.throttled.rates properties from each broker config
  4. The controller then, for each broker in the request, adds the leader.replication.throttled.rates and follower.replication.throttled.rates properties to each broker config.
  5. The controller then updates /admin/throttled_rates_removal znode with JSON representation of brokers in the request.

The intent behind this algorithm is that should the controller crash during the update, throttles will still be removed on completion of reassignment.


No Format
AlterInterbrokerThrottledRatesResponse => [broker_error]
  broker_error => broker_id error_code error_messgae
      broker_idid => INT32
      error_code => INT32INT16
      error_codemessage => INT16
      error_message => NULLABLE_STRING NULLABLE_STRING

Anticipated Errors:

  • NOT_CONTROLLER (41) if the request was sent to a broker that wasn't the controller.
  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_THROTTLE (new) if the throttled rate is <= 0.
  • UNKNOWN_MEMBER_ID(25) if the broker id in the request is not a broker in the cluster

AdminClient: alterInterbrokerThrottledReplicas()

Code Block
languagejava
linenumberstrue
/**
 * Set the partitions and brokers subject to the
 * {@linkplain #alterInterbrokerThrottledRate(Map)
 * interbroker throttled rate}.
 * The brokers specified as the {@link ThrottledReplicas#leaders()} corresponding to a
 * topic partition given in {@code replicas} will be subject to the leader throttled rate
 * when acting as the leader for that partition.
 * The brokers specified as the {@link ThrottledReplicas#followers()} corresponding to a
 * topic partition given in {@code repicas} will be subject to the follower throttled rate
 * when acting as the follower for that partition.
 * If the given {@code replicas} is null then the throttle will apply to all topic partitions on all brokers.
 * TODO No! It should depend on the current reassignment
  as the follower for that partition.
 *
 * The throttle will be automatically removed at the end of the current reassignment,
 * unless overridden in the given options.
 *
 * The current throttled replicas can be obtained via {@link #describeConfigs(Collection)} with a
 * ConfigResource with type {@link ConfigResource.Type#TOPIC TOPIC} and name "leader.replication.throttled.replicas"
 * or "follower.replication.throttled.replicas".
 */
public abstract AlterInterbrokerThrottledReplicasResult alterInterbrokerThrottledReplicas(
        Map<TopicPartition, ThrottledReplicas> replicas,
        AlterInterbrokerThrottledReplicasOptions options);

...

Code Block
public class ThrottledReplicas {

    public public ThrottledReplicas(Collection<Integer> leaders, Collection<Integer> followers) { ... }

    /**
     * The brokers which should be throttled when acting as leader. A null value indicates all brokers in the cluster.
     */
    public Collection<Integer> leaders() { .. }

    /**
     * The brokers which should be throttled when acting as follower. A null value indicates all brokers in the cluster.
     */
    public Collection<Integer> followers() { ... }

}

public class AlterInterbrokerThrottledReplicasOptions extends AbstractOptions<AlterInterbrokerThrottledReplicasOptions> {

    public boolean autoRemoveThrottle() { ... }

    /**
     * True to automatically remove the throttle at the end of the current reassignment.
     */
    public AlterInterbrokerThrottledReplicasOptions autoRemoveThrottle(boolean autoRemoveThrottle) { ... }
}

public class AlterInterbrokerThrottledReplicasResult {
    // package-access ctor

    public Map<TopicPartition, KafkaFuture<Void>> values() { ... }

    public KafkaFuture<Void> all() { ... }
}

 

Network API: AlterInterbrokerThrottledReplicasRequest and AlterInterbrokerThrottledReplicasResponse

...

  1. The controller validates the partitions and brokers in the request and that the principal has Alter operation on the CLUSTER resource.
     
  2. The controller gets the current value of the the /admin/throttled_replicas_removal znode, forms the union of those topics with those in the request and updates the the /admin/throttled_rates_removal znode with JSON representation of this union
  3. The controller then subtracts the topics in the request from the current topics and removes the the leader.replication.throttled.replicas and and follower.replication.throttled.replicas properties from each topic config
  4.  

    The controller then, for each topic in the request, adds

    the

    the leader.replication.throttled.rates

    and

    and follower.replication.throttled.rates properties to each topic config.

  5. The controller then updates /admin/throttled_replicas_removal znode with JSON representation of topics in the request.

The intent behind this algorithm is that should the controller crash during the update, throttles will still be removed on completion of reassignment.


No Format
AlterInterbrokerThrottledReplicasResponse => [topic_errors]
  topic_errors => topic [partition_errors]
    topic => STRING
    partition_errors => partition_id error_code error_messgae
      partition_id => INT32
 topic [partition_errors]
    topicerror_code => STRINGINT16
      partitionerror_errorsmessage => partition_id error_code error_messgae
      partition_id => INT32
      error_code => INT16
      error_message => NULLABLE_STRINGNULLABLE_STRING

Anticipated errors:

  • NOT_CONTROLLER (41) if the request was sent to a broker that wasn't the controller.
  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • UNKNOWN_TOPIC_OR_PARTITION (3) if a partition in the request is not known in the cluster.
  • UNKNOWN_MEMBER_ID(25) if the broker id in the request is not a broker in the cluster

On Controller Failover

The algorithms presented above, using the new znodes, are constructed so that should the controller fail, on election of a new controller ZooKeeper is not left in an inconsistent state where throttles which should be removed automatically are not removed at the end of the reassignment. The recovery algorithm is as follows:

  1. If the the /admin/reassign_partitions znode exists we assume a reassignment is on-going and do nothing.
  2. Otherwise, if the the /admin/reassign_partitions znode does not exists, we proceed to remove the throttles, as detailed in "Throttle removal" section below.

...

When reassignment is complete:

  1. The The /admin/reassign_partitions znode gets removed.
  2. We remove the throttles, as detailed in "Throttle removal" section below.

Throttle removal

The algorithm for removing the throttled replicas is:

  1. If If /admin/remove_throttled_replicas is set:
    1. For each of the topics listed in that znode:

      1. Remove the the (leader|follower).replication.throttled.replicas propertes properties for that topic config.

    2. Remove the the /admin/remove_throttled_replicas znode

The symmetric algorithm is used for /admin/remove_throttled_rates, only with broker configs.

...