Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Withdrawn

 

Note this was initially erroneously assigned as KIP-178, which was already taken, and has been reassigned KIP-179.

Table of Contents

Status

Current state:  Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]Withdrawn

Discussion thread: here (when initially misnumbered as KIP-178) and here (when assigned KIP-179)

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).e

Motivation

Describe the problems you are trying to solve.

...

  • The AdminClient API currently lacks any functionality for reassigning partitions: Users have to use the kafka-reassign-partitions.sh tool (ReassignPartitionsCommand) which talks directly to ZooKeeper. This prevents the tool being used in deployments where only the brokers are exposed to clients (i.e. where the zookeeper servers are intentionally not exposed). In addition, there is a general push to refactor/rewrite/replace tools which need ZooKeeper access with equivalents which use the AdminClient API.

...

  • ReassignPartitionsCommand currently has no proper facility to

...

  • report progress of a reassignment;

...

  • While --verify can be used periodically to check whether the request assignments have been achieved the tool provides no means of knowing how quickly new replicas are catching up. It would be useful if the tool could report progress better.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

...

Binary log format

...

The network protocol and api behavior

...

Any class in the public packages under clientsConfiguration, especially client configuration

  • org/apache/kafka/common/serialization

  • org/apache/kafka/common

  • org/apache/kafka/common/errors

  • org/apache/kafka/clients/producer

  • org/apache/kafka/clients/consumer (eventually, once stable)

...

Monitoring

...

Command line tools and arguments

  • ReassignPartitionsCommand, when used with a replication throttle, requires a --verify invocation when the reassignment has finished in order to remove the throttle. So there exists the possibility that throttles are not removed after reassignment, with negative consequences for the performance of the cluster. It would be better if throttles could be removed automatically.

Public Interfaces

The AdminClient API will have new methods added (plus overloads for options):

The options for reassignPartitions() will support setting a throttle, and a flag for its automatic removal at the end of the reassignment. Likewise the options for changing the throttled rates and replicas will include the ability to have the throttles automatically removed.

...

New network protocol APIs will be added :

The AdminClient API will have new methods added (plus overloads for options):

  • alterPartitionCounts(Map<String, Integer> partitionCounts)
  • alterReplicationFactors(Map<String, Short> replicationFactors)
  • reassignPartitions(Map<TopicPartition, List<Integer>>)
  • replicaStatus(Collection<Replica> replicas)

to support these AdminClient APIs

The options accepted by The options accepted by kafka-reassign-partitions.sh command will change:

  • --zookeeper will be deprecated, with a warning message
  • a new --bootstrap-server option will be added
  • a new --progress action option will be added

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

When run with --bootstrap-server it will no longer be necessary to run kafka-reassign-partitions.sh

...

The  --zookeeper option will be retained and will:

  1. Cause a deprecation warning to be printed to standard error. The message will say that the --zookeeper option will be removed in a future version and that --bootstrap-server is the replacement option.
  2. Perform the reassignment via ZooKeeper, as currently.

A new --bootstrap-server option will be added and will:

  1. Perform the reassignment via the given intermediating broker.

Using both --zookeeper and --bootstrap-server in the same command will produce an error message and the tool will exit without doing the intended operation.

It is anticipated that a future version of Kafka would remove support for the --zookeeper option.

A new --progress action option will be added. This will only be supported when used with --bootstrap-server. If used with --zookeeper the command will produce an error message and the tool will exit without doing the intended operation. --progress will report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file option

For example:

No Format
# If the following command is used to start a reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --execute
       
# then the following command will print the progress of
# that reassignment, then exit immediately
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --progress

That might print something like the following:

verify to remove a throttle: This will be done automatically.

Proposed Changes

Summary of use cases

Use casecommandAdminClient
Change replication factorkafka-reassign-partitions --execute --reassignment-json-file JreassignPartitions()
Change partition assignmentkafka-reassign-partitions --execute --reassignment-json-file JreassignPartitions()
Change partition assignment with throttlekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

reassignPartitions() // with throttle option

Change throttled ratekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

alterInterbrokerThrottledRates()

alterInterbrokerThrottledReplicas()

// TODO how to do this conveniently?

Check progress of a reassignmentkafka-reassign-partitions --progress --reassignment-json-file JdescribeReplicaLogDirs() (see KIP-113)
Check result and clear throttlekafka-reassign-partitions --verify --reassignment-json-file J

reassignPartitions(validateOnly)

// TODO checks none in progress, doesn't confirm states match

 

 kafka-reassign-partitions.sh and ReassignPartitionsCommand

The --zookeeper option will be retained and will:

  1. Cause a deprecation warning to be printed to standard error. The message will say that the --zookeeper option will be removed in a future version and that --bootstrap-server is the replacement option.
  2. Perform the reassignment via ZooKeeper, as currently.

A new --bootstrap-server option will be added and will:

  1. Perform the reassignment via the AdminClient API (described below) using the given broker(s) as bootstrap brokers.
  2. When used with --execute and --throttle, the throttle will be an auto-removed one.

Using both --zookeeper and --bootstrap-server in the same command will produce an error message and the tool will exit without doing the intended operation.

It is anticipated that a future version of Kafka would remove support for the --zookeeper option.

A new --progress action option will be added. This will only be supported when used with --bootstrap-server. If used with --zookeeper the command will produce an error message and the tool will exit without doing the intended operation. --progress will report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file option

For example:

No Format
# If the following command is used to start a reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --execute
       
# then the following command will print the progress of
# that reassignment, then exit immediately
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --progress

That might print something like the following:

No Format
Topic      Partition  Broker  Status
-------------------------------------
my_topic   0          0       In sync
my_topic   0          1       Behind: 10456 messages behind
asdf       0          
No Format
Topic      Partition  Broker  Status
-------------------------------------
my_topic   0          0       In sync
my_topic   0          1       Behind: 10456 messages behind
asdf       0          1       Unknown topic
my_topic   42         1       Unknown partition
my_topic   0          42      Unknown broker
my_topic   1          0       Broker does not host this partition

The implementation of --progress will make use of the describeReplicaLogDirs() method from KIP-113 to find the lag of the syncing replica.

Internally, the ReassignPartitionsCommand will be refactored Internally, the ReassignPartitionsCommand will be refactored to support the above changes to the options. An interface will abstract the commands currently issued directly to zookeeper.

...

In all other respects, the public API of ReassignPartitionsCommand will not be changed.

AdminClient:

...

reassignPartitions()

The following methods will be added to AdminClient to support changing topics' partition counts

Code Block
/**
 * Change the partition count of the topics given as the keys of counts
 * to the corresponding values.
 */
AlterPartitionCountsResult alterPartitionCounts(Map<String, Integer> counts,
                    AlterPartitionCountsOptions options)
// where
class AlterPartitionCountsOptions {
    // TODO validateOnly?
    // TODO timeout?
}
 
class AlterPartitionCountsResult {
    Map<String, KafkaFuture<Void>> values();
}

Where:

Code Block
class AlterPartitionCountsOptions {
    // TODO validateOnly?
    // TODO timeout?
}
 
class AlterPartitionCountsResult {
    Map<String, KafkaFuture<Void>> values();
}

AdminClient: alterReplicationCount()

The following methods will be added to AdminClient to support changing topics' replication factors.

Code Block
/**
 * Change the replication factor of the topics given as the keys of 
 * replicationFactors to the corresponding values.
 */
AlterReplicationFactorsResult alterReplicationFactors(Map<String, Short> replicationFactors)
AlterReplicationFactorsResult alterReplicationFactors(Map<String, Short> replicationFactors, 
                        AlterReplicationFactorsOptions options)

Where:

Code Block
class AlterReplicationFactorsOptions {
    // TODO validateOnly?
    // TODO timeout?
}
class AlterReplicationFactorsResult {
    Map<String, KafkaFuture<Void>> values();
}

 

AdminClient: reassignPartitions()

The following methods will be added to AdminClient to support changing the brokers hosting the partitions of a topic

Anchor
reassignPartitions
reassignPartitions
Notes:

  • This API is asynchronous in the sense that the client cannot assume that the request is complete (or the request was rejected) once they have obtained the result for the topic from the ReassignPartitionsResult.
  • The describeReplicaLogDirs() method from KIP-113 can be used to determine progress.
  • A call to reassignPartitions() with the validateOnly option can be used to determine whether a reassignment is currently running, and therefore whether the last reassignment has finished.

Code Block
languagejava
linenumberstrue
/**
 * <p>Reassign the partitions given as the key of the given <code>assignments</code> to the corresponding
 * list of brokers. The first broker in each list is the one which holds the "preferred replica".</p>
 *
 * <p>Inter-broker reassignment causes significant inter-broker traffic and can take a long time
 * in order to copy the replica data to brokers. The given options can be used impose a quota on
 * inter-broker traffic for the duration of the reassignment so that client-broker traffic is not
 * adversely affected.</p>
 *
 * <h3>Preferred replica</h3>
 * <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker
 * with the preferred replica will be elected leader automatically.
 * <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this
 * election when <code>auto.leader.rebalance.enable=false</code>.</p>
 *
 * @param assignments The partition assignments.
 * @param options The options to use when reassigning the partitions
 * @return The ReassignPartitionsResult
 */
public abstract ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments,
                      
Code Block
/**
 * Reassign the partitions given as the key of the given <code>assignments</code> to the corresponding 
 * list of brokers.
 */
ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments)
ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments, 
                        ReassignPartitionsOptions options);

Where:

Code Block
languagejava
linenumberstrue
public class ReassignPartitionsOptions extends AbstractOptions<ReassignPartitionsOptions> {

    // booleanNote validateOnlytimeoutMs() inherited from AbstractOptions

    public ReassignPartitionsOptionsboolean validateOnly(boolean validateOnly)

    long timeoutMs()/**
    ReassignPartitionsOptions timeoutMs(long timeoutMs)
}
class ReassignPartitionsResult {
* Validate the request only: Do not actually trigger replica reassignment.
     */
    public ReassignPartitionsOptions validateOnly(boolean validateOnly)

    public long throttle() {
       Map<String, KafkaFuture<Void>>return values()throttle;
}

Partition reassignment is a long running operation, and the ReassignPartitionsResult indicates only that the reassignment has been started, not that the reassignment has been completed.

AdminClient: replicaStatus()

...

Code Block
    }

    /**
 * Query the replication status* of<p>Set the giventhrottle partitions.rate 
 */
public ReplicaStatusResult replicaStatus(Collection<TopicPartition> replicas)   
public ReplicaStatusResult replicaStatus(Collection<TopicPartition> replicas,  ReplicaStatusOptions options)

Where:

Code Block
public class ReplicaStatusOptions {
    
}

public class ReplicaStatusResult {and throttled replicas for the reassignments.
     * The given throttle is in bytes/second and should be at least 1 KB/s.
    public * KafkaFuture<Map<TopicPartition,Interbroker List<ReplicaStatus>>> all()
}

/** 
 * Representsreplication traffic will be throttled to approximately the replicationgiven statusvalue.
 of a partition 
 * on a particular broker.
 */ 
public class ReplicaStatus {Use Long.MAX_VALUE if the reassignment should not be throttled.</p>
    / **
 The topic about which this* is<p>A thepositive status of */throttle is equivalent to setting:</p>
    String topic()
 * <ul>
     /**   The partition about<li>The whichleader thisand isfollower thethrottled statusrates ofto */
the given value given int partition()
by throttle.</li>
     /** The broker about which this<li>The isleader thethrottled statusreplicas of */
each topic in the int broker()
    request to include the existing brokers having
    /* * 
    replicas *of Thethe timepartitions (as milliseconds since the epoch) that in the request.</li>
     *  this status data was<li>The collected.follower Inthrottled generalreplicas thisof may
each topic in the request *to beinclude somethe timenew beforebrokers
 the replicaStatus() request time.
 *    */
 for each partition publicin long statusTime()that topic.</li>
     * </ul>
    / ** 
     * The<p>The numbervalue of messages that the replica on this broker is behind{@link #autoRemoveThrottle()} will determine whether these
     * thethrottles leader.
will be removed automatically when */
    public long lag()
the reassignment completes.</p>
     *
     * @see AdminClient#alterInterbrokerThrottledRate(int, long, long)
     * @see AdminClient#alterInterbrokerThrottledReplicas(Map)
     */
}

Authorization

With broker-mediated reassignment it becomes possible limit the authority to perform reassignment to something finer-grained than "anyone with access to zookeeper".

The reasons for reassignment are usually operational. For example, migrating partitions to new brokers when expanding the cluster, or attempting to find a more balanced assignment (according to some notion of balance). These are cluster-wide considerations and so authority should be for the reassign operation being performed on the cluster. Therefore alterTopics() will require ClusterAction on the CLUSTER.

replicaStatus() will require Describe on the CLUSTER.

Network Protocol: AlterTopicsRequest and AlterTopicsResponse

...

No Format
AlterTopicsRequest => [alter_topic_requests] validate_only
  alter_topic_requests => topic num_partitions replication_factor [partition_assignment]
    topic => STRING
    num_partitions => INT32
    replication_factor => INT16
    partition_assignment => partition_id brokers
      partition_id => INT32
      brokers => [INT32]
  validate_only => BOOLEAN
  timeout => INT32

Where

FIELDDESCRIPTION
topic

the topic name

num_partition

the number of partitions. A num_partitions of -1 that would mean "no change"

replication_factor

the replication factor. A  replication_factor of -1 would mean "no change"

partition_id

the partition id

brokers

the ids of the assigned brokers for this partition

validate_only

true to just validate the request, but not actually alter the topics

timeoutthe timeout, in ms, to wait for the topic to be altered.

An empty partition_assignment would mean that the broker should calculate a suitable assignment. Such broker calculated assignment is unlikely to be balanced.

It is not necessary to send an AlterTopicsRequest to the leader for a given partition. Any broker will do.

...

    public ReassignPartitionsOptions throttle(long throttle) { ... }

    public boolean autoRemoveThrottle() { ... }

    /**
     * True to automatically remove the throttle at the end of the current reassignment.
     */
    public ReassignPartitionsOptions autoRemoveThrottle(boolean autoRemoveThrottle) { ... }
}

public class ReassignPartitionsResult {
    public Map<TopicPartition, KafkaFuture<Void>> values();
    public KafkaFuture<Void> all();
}

Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse

Anchor
ReassignPartitionsRequest
ReassignPartitionsRequest
ReassignPartitionsRequest initiates the movement of replicas between brokers, and is the basis of the AdminClient.reassignPartitions() method

Notes:

  • The request must be sent to the controller.
  • The request  requires the Alter operation on the CLUSTER resource, since it can require significant inter-broker communication.
  • The request will be subject to a policy, as described in KIP-201.
No Format
ReassignPartitionsRequest => [topic_reassignments] timeout validate_only throttle remove_throttle
  topic_reassignments => topic [partition_reassignments]
    topic => STRING
    partition_reassignments => partition_id [broker]
      partition_id => INT32
      broker => INT32
  timeout => INT32
  validate_only => BOOLEAN
  throttle => INT64
  remove_throttle => BOOLEAN

Where

FieldDescription
topicthe name of a topic
partition_ida partition of that topic
brokera broker id
timeoutThe maximum time to await a response in ms.
validate_onlywhen true: validate the request, but don't actually reassign the partitions

Algorithm:

  1. The controller validates the request against configured authz, policy etc.
  2. The controller computes set of topics in the request, and writes this as JSON to the new /admin/throttled_replicas_removal znode
  3. The controller then updates the existing leader.replication.throttled.replicas and follower.replication.throttled.replicas properties of each topic config.
  4. The controller computes the union of 1) the brokers currently hosting replicas of the topic partitions in the request 2) the brokers assigned to host topic partitions in the request, and write this as JSON to the new /admin/throttled_rates_removal znode.
  5. The controller then updates the existing leader.replication.throttled.rates and follower.replication.throttled.rates properties of each broker config.
  6. The controller writes reassignment JSON to the /admin/reassign_partitions znode

The intent behind this algorithm is that should the controller crash during the update, the reassignment won't have started and the throttles will be removed on controller failover.

The broker will use the same algorithm for determing the values of the topic and broker configs as is currently used in the ReassignPartitionsCommand.

Anchor
ReassignPartitionsResponse
ReassignPartitionsResponse
ReassignPartitionsResponse describes which partitions in the request will be moved, and what was wrong with the request for those partitions which will not be moved.

No Format
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
  throttle_time_ms => INT32
  reassign_partition_result => topic [partition_error]
    topic => STRING
    partition_error => partition_id error_code error_message
      partition_id => INT32
      error_code => INT16
      error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topica topic name from the request
partition_ida partition id for that topic, from the request
error_codean error code for that topic partition
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • POLICY_VIOLATION(44) The request violated the configured policy
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_reassignments included an unknown broker id
  • INVALID_REQUEST (42) If duplicate topics appeared in the request
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new) If the reassignment cannot be started because a reassignment is currently running (i.e. the /admin/reassign_partitions znode exists)
  • INVALID_THROTTLE (new) If the given throttle is <=0.
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) reassignment has started

AdminClient: alterInterbrokerThrottledRates()

Code Block
languagejava
linenumberstrue
/** 
 * Change the rate at which interbroker replication is throttled, replacing existing throttled rates. 
 * For each broker in the given {@code rates}, the {@code leaderRate} of the corresponding 
 * {@code ThrottledRate} is the throttled rate when the broker is acting as leader and 
 * the {@code followerRate} is the throttled rate when the broker is acting as follower.
 * For the throttled rates to take effect, the given broker must also be present in the
 * list of throttled replicas, which can be set by {@link #alterInterbrokerThrottledReplicas()}.
 * The throttle will be automatically removed at the end of the current reassignment,
 * unless overridden in the given options.
 *
 * The current rates can be obtained from {@link #describeConfigs(Collection)}.
 *
 * @param rates Map from broker id to the throttled rates for that broker.
 * @param options The options.
 */
public abstract AlterInterbrokerThrottledRateResult alterInterbrokerThrottledRate(
        Map<Integer, ThrottledRate> rates, 
        AlterInterbrokerThrottledRateOptions options);

Where:

Code Block
/**
 * The throttled rate for interbroker replication on a particular broker.
 */
public class ThrottledRate {
    public ThrottledRate(long leaderRate, long followerRate) { ... }
    /**
     * The throttled rate when the broker is acting as leader.
     */
    long leaderRate() { ... }
    /**
     * The throttled rate when the broker is acting as follower.
     */
    long followerRate() { ... }
}

public class AlterInterbrokerThrottledRateOptions extends AbstractOptions<AlterInterbrokerThrottledRateOptions> {

    public boolean autoRemoveThrottle() { ... }

    /**
     * True to automatically remove the throttle at the end of the current reassignment.
     */
    public AlterInterbrokerThrottledRateOptions autoRemoveThrottle(boolean autoRemoveThrottle) { ... }
}

public class AlterInterbrokerThrottledRateResult {
    // package-access ctor

    public Map<Integer, KafkaFuture<Void>> values() { ... }

    public KafkaFuture<Void> all() { ... }
}

Network API: AlterInterbrokerThrottledRatesRequest and AlterInterbrokerThrottledRatesResponse

No Format
AlterInterbrokerThrottledRatesRequest => [broker_throttles] remove_throttle timeout validate_only
  broker_throttles => broker_id leader_rate follower_rate
    broker_id => INT32
    leader_rate => INT64
    follower_rate => INT64
  timeout => INT32
  validate_only => BOOLEAN
  remove_throttle => BOOLEAN

Algorithm:

  1. The controller validates the brokers and rates in the request and that the principal has Alter operation on the CLUSTER resource.
  2. The controller gets the current value of the /admin/throttled_rates_removal znode, forms the union of those brokers with those in the request and updates the /admin/throttled_rates_removal znode with JSON representation of this union
  3. The controller then subtracts the brokers in the request from the current brokers and removes the leader.replication.throttled.rates and follower.replication.throttled.rates properties from each broker config
  4. The controller then, for each broker in the request, adds the leader.replication.throttled.rates and follower.replication.throttled.rates properties to each broker config.
  5. The controller then updates /admin/throttled_rates_removal znode with JSON representation of brokers in the request.

The intent behind this algorithm is that should the controller crash during the update, throttles will still be removed on completion of reassignment.


No Format
AlterInterbrokerThrottledRatesResponse => [broker_error]
  broker_error => broker_id error_code error_messgae
      broker_id => INT32
      error_code => INT16
      error_message => NULLABLE_STRING

Anticipated Errors:

  • NOT_CONTROLLER (41) if the request was sent to a broker that wasn't the controller.
  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_THROTTLE (new) if the throttled rate is <= 0.
  • UNKNOWN_MEMBER_ID(25) if the broker id in the request is not a broker in the cluster

AdminClient: alterInterbrokerThrottledReplicas()

Code Block
languagejava
linenumberstrue
/**
 * Set the partitions and brokers subject to the
 * {@linkplain #alterInterbrokerThrottledRate(Map)
 * interbroker throttled rate}.
 * The brokers specified as the {@link ThrottledReplicas#leaders()} corresponding to a
 * topic partition given in {@code replicas} will be subject to the leader throttled rate
 * when acting as the leader for that partition.
 * The brokers specified as the {@link ThrottledReplicas#followers()} corresponding to a
 * topic partition given in {@code repicas} will be subject to the follower throttled rate
 * when acting as the follower for that partition.
 *
 * The throttle will be automatically removed at the end of the current reassignment,
 * unless overridden in the given options.
 *
 * The current throttled replicas can be obtained via {@link #describeConfigs(Collection)} with a
 * ConfigResource with type {@link ConfigResource.Type#TOPIC TOPIC} and name "leader.replication.throttled.replicas"
 * or "follower.replication.throttled.replicas".
 */
public abstract AlterInterbrokerThrottledReplicasResult alterInterbrokerThrottledReplicas(
        Map<TopicPartition, ThrottledReplicas> replicas,
        AlterInterbrokerThrottledReplicasOptions options);

Where:

Code Block
public class ThrottledReplicas {

    public ThrottledReplicas(Collection<Integer> leaders, Collection<Integer> followers) { ... }

    /**
     * The brokers which should be throttled when acting as leader. A null value indicates all brokers in the cluster.
     */
    public Collection<Integer> leaders() { .. }

    /**
     * The brokers which should be throttled when acting as follower. A null value indicates all brokers in the cluster.
     */
    public Collection<Integer> followers() { ... }

}

public class AlterInterbrokerThrottledReplicasOptions extends AbstractOptions<AlterInterbrokerThrottledReplicasOptions> {

    public boolean autoRemoveThrottle() { ... }

    /**
     * True to automatically remove the throttle at the end of the current reassignment.
     */
    public AlterInterbrokerThrottledReplicasOptions autoRemoveThrottle(boolean autoRemoveThrottle) { ... }
}

public class AlterInterbrokerThrottledReplicasResult {
    // package-access ctor
    public Map<TopicPartition, KafkaFuture<Void>> values() { ... }
    public KafkaFuture<Void> all() { ... }
}

 

Network API: AlterInterbrokerThrottledReplicasRequest and AlterInterbrokerThrottledReplicasResponse

No Format
AlterInterbrokerThrottledRatesRequest => [topic_throttles] remove_throttle timeout validate_only
  topic_throttles => topic [partition_throttles]
    topic => STRING
    partition_throttles => partition_id [broker_id]
      partition_id => INT32
      broker_id => INT32
  timeout => INT32
  validate_only => BOOLEAN
  remove_throttle => BOOLEAN

Algorithm:

  1. The controller validates the partitions and brokers in the request and that the principal has Alter operation on the CLUSTER resource.
     
  2. The controller gets the current value of the /admin/throttled_replicas_removal znode, forms the union of those topics with those in the request and updates the /admin/throttled_rates_removal znode with JSON representation of this union
  3. The controller then subtracts the topics in the request from the current topics and removes the leader.replication.throttled.replicas and follower.replication.throttled.replicas properties from each topic config
  4.  

    The controller then, for each topic in the request, adds the leader.replication.throttled.rates and follower.replication.throttled.rates properties to each topic config.

  5. The controller then updates /admin/throttled_replicas_removal znode with JSON representation of topics in the request.

The intent behind this algorithm is that should the controller crash during the update, throttles will still be removed on completion of reassignment.


No Format
AlterInterbrokerThrottledReplicasResponse => [topic_errors]
  topic_errors => topic [partition_errors]
    topic => STRING
    partition_errors => partition_id error_code error_messgae
      partition_id => INT32
      error_code => INT16
      error_message => NULLABLE_STRING

Anticipated errors:

  • NOT_CONTROLLER (41) if the request was sent to a broker that wasn't the controller.
  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • UNKNOWN_TOPIC_OR_PARTITION (3) if a partition in the request is not known in the cluster.
  • UNKNOWN_MEMBER_ID(25) if the broker id in the request is not a broker in the cluster

On Controller Failover

The algorithms presented above, using the new znodes, are constructed so that should the controller fail, on election of a new controller ZooKeeper is not left in an inconsistent state where throttles which should be removed automatically are not removed at the end of the reassignment. The recovery algorithm is as follows:

  1. If the /admin/reassign_partitions znode exists we assume a reassignment is on-going and do nothing.
  2. Otherwise, if the /admin/reassign_partitions znode does not exists, we proceed to remove the throttles, as detailed in "Throttle removal" section below.

On reassignment completion

When reassignment is complete:

  1. The /admin/reassign_partitions znode gets removed.
  2. We remove the throttles, as detailed in "Throttle removal" section below.

Throttle removal

The algorithm for removing the throttled replicas is:

  1. If /admin/remove_throttled_replicas is set:
    1. For each of the topics listed in that znode:

      1. Remove the (leader|follower).replication.throttled.replicas properties for that topic config.

    2. Remove the /admin/remove_throttled_replicas znode

The symmetric algorithm is used for /admin/remove_throttled_rates, only with broker configs.

Compatibility, Deprecation, and Migration Plan

Existing users of the kafka-reassign-partitions.sh will receive a deprecation warning when they use the --zookeeper option. The option will be removed in a future version of Kafka. If this KIP is introduced in version 1.0.0 the removal could happen in 2.0.0.

Implementing AdminClient.alterConfigs() for (dynamic) broker configs was considered as a way of implementing throttle management but this would not support the auto removal feature.

Not supporting passing a throttle in the AdminClient.reassignPartitions() (and just using the APIs for altering throttles) was considered, but:

  • Being able to specify a throttle at the same time at starting the reassignment is very convenient.
  • Race conditions are possible if the APIs requires throttles set up before reassignment starts. What if reassignPartitions() doesn't get called, or none of the partitions in the call can be be reassigned?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

One alternative is to do nothing: Let the ReassignPartitionsCommand continue to communicate with ZooKeeper directly.

Another alternative is to do exactly this KIP, but without the deprecation of --zookeeper. That would have a higher long term maintenance burden, and would prevent any future plans to, for example, provide alternative cluster technologies than ZooKeeper.

An alterTopics() AdminClient API, mirroring the existing createTopics() API, was considered, but:

  • Some calls to alterTopics() (such as increasing the partition count) would have been synchronous, while others (such as moving replicas between brokers) would have been long running and thus asynchronous. This made for an API which synchronousness depended on the arguments.
  • createTopics() allows to specify topic configs, whereas alterConfigs() is already provided to change topic configs, so it wasn't an exact mirror

Just providing reassignPartitions() was considered, with changes to partition count inferred from partitions present in the assignments argument. This would require the caller to provide an assignment of partitions to brokers, whereas currently it's possible to increase the partition count without specifying an assignment. It also suffered from the synchronous/asynchronous API problem.

Similarly a alterReplicationFactors() method, separate from reassignPartitions() was considered, but both require a partition to brokers assignment, and both are implemented in the same way (by writing to the /admin/reassign_partitions znode), so there didn't seem much point in making an API which distinguished them.

Algorithms making use of the ZooKeeper "multi" feature for atomic update of multiple znodes were considered. It wasn't clear that these would be better than the algorithms presented above

No Format
AlterTopicsResponse => throttle_time_ms [topic_errors]
  throttle_time_ms => INT32
  topic_errors => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_ms

duration in milliseconds for which the request was throttled

topic

the topic name

error_code

the error code for altering this topic

error_message

detailed error information

Possible values for error_code:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the num_partitions was invalid
  • INVALID_REPLICATION_FACTOR (38) If the replication_factor was invalid
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_assignment included an unknown broker id
  • INVALID_REQUEST (42) If trying to modify the partition assignment and the number of partitions or the partition assignment and the replication factor in the same request. Or if duplicate topics appeared in the request.
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) If the request was successful and the alteration/reassignment has been started.

As currently, it will not be possible to have multiple reassignments running concurrently, hence the addition of the PARTITION_REASSIGNMENT_IN_PROGRESS error code.

Policy

The existing CreateTopicPolicy can be used to apply a cluster-wide policy on topic configuration at the point of creation via the create.topic.policy.class.name config property. To avoid an obvious loophole, it is necessary to also be able to apply a policy to topic alteration. Maintaining two separate policies in sync is a burden both in terms of class implementation and configuring the policy. It seems unlikely that many use cases would require a different policy for alteration than creation. On the other hand, just applying the CreateTopicPolicy to alterations is undesirable because:

  • Its name doesn't convey that it would be applied to alterations too
  • Its API (specifically its RequestMetadata member class) includes topic configs (i.e. Map<String, String>) which is not part of the API for topic alteration even though it is part of the API for topic creation.
  • It prevents any use cases which legitimately did need to apply a different policy for alteration than creation.

Finding a balance between compatibility with existing deployments, and not opening the loophole is difficult.

The existing create.topic.policy.class.name config would continue to work, and would continue to name an implementation of CreateTopicPolicy. That policy would be applied to alterations automatically. The topic's config would be presented to the validate() method (via the RequestMetadata) even though it's not actually part of the AlterTopicsRequest. The documentation for the interface and config property would be updated.

Network Protocol: ReplicaStatusRequest and ReplicaStatusResponse

...

No Format
ReplicaStatusRequest => [replica_status_requests]
  replica_status_requests => topic partition_id broker
    topic => STRING
    partition_id => INT32
    broker => INT32

Where

FieldDescription
topic

a topic name

partition_id

a partition id of this topic

broker

a follower broker id for this partition

 

...

No Format
ReplicaStatusResponse => [replica_status]
  replica_status => topic partition_id broker error_code status_time lag
    topic => STRING
    partition_id => INT32
    broker => INT32
    error_code => INT16
    status_time => INT64
    lag => INT64

Where

FieldDescription
topic

the topic name

partition_id

the partition id of this topic

broker

the follower broker id

error_code

an error code

status_time

the time the status was current

lag

the lag (#messages) of this broker, for this partition

Anticipated errors are:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed. (or the TOPIC?)
  • INVALID_TOPIC_EXCEPTION (17) The topic is not known
  • INVALID_PARTITIONS (37)  The partion_id of the given topic is not valid
  • UNKNOWN_MEMBER_ID (25) The given broker id is not known.
  • UNKNOWN_TOPIC_OR_PARTITION (3) The given broker is not a follower for the partition identified by topic, partition.
  • NONE (0) if the status request completed normally,

Implementation

The AdminClient.replicaStatus() will make the underlying ReplicaStatusRequest to the leader for the given partition. This saves the need for every broker (because any broker could be the --bootstrap-server ) to have knowledge of the replication status of every replica, which would be inefficient in network IO and/or memory use.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Existing users of the kafka-reassign-partitions.sh will receive a deprecation warning when they use the --zookeeper option. The option will be removed in a future version of Kafka. If this KIP is introduced in version 1.0.0 the removal could happen in 2.0.0.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

One alternative is to do nothing: Let the ReassignPartitionsCommand continue to communicate with ZooKeeper directly.

Another alternative is to do exactly this KIP, but without the deprecation of --zookeeper. That would have a higher long term maintenance burden, and would prevent any future plans to, for example, provide alternative cluster technologies than ZooKeeper.