You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Note this was initially erroneously assigned as KIP-178, which was already taken, and has been reassigned KIP-179.

Status

Current state: Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here (when initially misnumbered as KIP-178) and here (when assigned KIP-179)

JIRA: here Unable to render Jira issues macro, execution error. Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

Firstly, the ReassignPartitionsCommand (which is used by the kafka-reassign-partitions.sh tool) talks directly to ZooKeeper. This prevents the tool being used in deployments where only the brokers are exposed to clients (i.e. where the zookeeper servers are intentionally not exposed). In addition, there is a general push to refactor/rewrite/replace tools which need ZooKeeper access with equivalents which use the AdminClient API. Thus it is necessary to change the ReassignPartitionsCommand so that it no longer talks to ZooKeeper directly, but via an intermediating broker. Similar work is needed for the kafka-topics.sh tool (which can also change assignments and numbers of partitions and replicas), so common AdminClient and protocol APIs are desirable.

Secondly, ReassignPartitionsCommand currently has no proper facility to report progress of a reassignment; --verify can be used periodically to check whether the request assignments have been achieved. It would be useful if the tool could report progress better.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

New network protocol APIs will be added:

The AdminClient API will have new methods added (plus overloads for options):

The options accepted by kafka-reassign-partitions.sh command will change:

  • --zookeeper will be deprecated, with a warning message
  • a new --bootstrap-server option will be added
  • a new --progress action option will be added

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

kafka-reassign-partitions.sh and ReassignPartitionsCommand

The --zookeeper option will be retained and will:

  1. Cause a deprecation warning to be printed to standard error. The message will say that the --zookeeper option will be removed in a future version and that --bootstrap-server is the replacement option.
  2. Perform the reassignment via ZooKeeper, as currently.

A new --bootstrap-server option will be added and will:

  1. Perform the reassignment via the given intermediating broker.

Using both --zookeeper and --bootstrap-server in the same command will produce an error message and the tool will exit without doing the intended operation.

It is anticipated that a future version of Kafka would remove support for the --zookeeper option.

A new --progress action option will be added. This will only be supported when used with --bootstrap-server. If used with --zookeeper the command will produce an error message and the tool will exit without doing the intended operation. --progress will report on the synchronisation of each of the partitions and brokers in the reassignment given via the --reassignment-json-file option

For example:

# If the following command is used to start a reassignment
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --execute
       
# then the following command will print the progress of
# that reassignment, then exit immediately
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9878 \
  --reassignment-json-file expand-cluster-reassignment.json \
  --progress

That might print something like the following:

Topic      Partition  Broker  Status
-------------------------------------
my_topic   0          0       In sync
my_topic   0          1       Behind: 10456 messages behind
asdf       0          1       Unknown topic
my_topic   42         1       Unknown partition
my_topic   0          42      Unknown broker
my_topic   1          0       Broker does not host this partition

The implementation of --progress will make use of the describeReplicaDir() method from KIP-113 to find the lag of the syncing replica.

Internally, the ReassignPartitionsCommand will be refactored to support the above changes to the options. An interface will abstract the commands currently issued directly to zookeeper.

There will be an implementation which makes the current calls to ZooKeeper, and another implementation which uses the AdminClient API described below.

In all other respects, the public API of ReassignPartitionsCommand will not be changed.

AdminClient: alterPartitionCount()

To support kafka-topics.sh --alter --partitions ... the following methods will be added to AdminClient to support changing topics' partition counts

/**
 * <p>Change the partition count of the topics given as the keys of {@code counts}
 * to the corresponding values. Currently it is only possible to increase 
 * the partition count.</p>
 *
 * <p>The replicas of new partitions will be allocated to the least loaded broker, 
 * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p>
 */
public AlterPartitionCountsResult alterPartitionCounts(Map<String, Integer> counts,
                    AlterPartitionCountsOptions options)
public AlterPartitionCountsResult alterPartitionCounts(Map<String, Integer> counts) 

Where:

public class AlterPartitionCountsOptions {
    public AlterPartitionCountsOptions() { ... }
    public Integer timeoutMs() { ... }
    public AlterPartitionCountsOptions timeoutMs(Integer timeoutMs) { ... }
}
 
public class AlterPartitionCountsResult {
    // package access constructor
    Map<String, KafkaFuture<Void>> values() { ... }
    KafkaFuture<Void> all() { ... }
}

AdminClient: alterReplicationFactors()

To support kafka-topics.sh --alter --replication-factor ... the following methods will be added to AdminClient to support changing topics' replication factors.

/**
 * <p>Change the replication factor of the topics given as the keys of 
 * replicationFactors to the corresponding values.</p>
 *
 * <p>New replicas will be allocated to the least loaded broker, 
 * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p>
 */
AlterReplicationFactorsResult alterReplicationFactors(Map<String, Short> replicationFactors)
AlterReplicationFactorsResult alterReplicationFactors(Map<String, Short> replicationFactors, 
                        AlterReplicationFactorsOptions options)

Where:

public class AlterReplicationFactorsOptions {
    public AlterReplicationFactorsOptions() { ... }
    public Integer timeoutMs() { ... }
    public AlterReplicationFactorsOptions timeoutMs(Integer timeoutMs) { ... }
}
public class AlterReplicationFactorsResult {
    // package access constructor
    Map<String, KafkaFuture<Void>> values() { ... }
    KafkaFuture<Void> all() { ... }
}

AdminClient: reassignPartitions()

To support kafka-topics.sh --alter --replica-assignment ... and kafka-reassign-partitions.sh the following methods will be added to AdminClient to support changing the brokers hosting the partitions of a topic

/**
 * <p>Reassign the partitions given as the key of the given <code>assignments</code> to the corresponding 
 * list of brokers. The first broker in each list is the one which holds the "preferred replica".</p>
 *
 * <p>Inter-broker reassignment causes significant inter-broker traffic and can take a long time 
 * in order to copy the replica data to brokers. It may be necessary to impose a quota on 
 * inter-broker traffic for the duration of the reassignment so that client-broker traffic is not
 * adversely affected.</p>
 *
 * <h3>Preferred replica</h3>
 * <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker
 * with the preferred replica will be elected leader automatically. 
 * <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this 
 * election when <code>auto.leader.rebalance.enable=false</code>.</p>
 */
ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments)
ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments, 
                        ReassignPartitionsOptions options)

Where:

class ReassignPartitionsOptions {

    boolean validateOnly()

    /**
     * Validate the request only: Do not actually trigger replica reassignment.
     */
    ReassignPartitionsOptions validateOnly(boolean validateOnly)

    long timeoutMs()

    /**
     * Set a timeout for the starting of the reassignment. 
     * Note this timeout does not include the time take to actually
     * move replicas between brokers.
     */
    ReassignPartitionsOptions timeoutMs(long timeoutMs)

    long throttle()

    /**
     * Set a throttle, in bytes per second, on the bandwidth used for 
     * inter-broker replica movement.
     */
    ReassignPartitionsOptions throttle(long throttledRateBytesPerSecond)

 }
class ReassignPartitionsResult {
    Map<String, KafkaFuture<Void>> values();
}

Partition reassignment is a long running operation, and the ReassignPartitionsResult indicates only that the reassignment has been started, not that the reassignment has been completed. The describeReplicaDir() method from KIP-113 can be used to determine progress.

AdminClient: alterInterBrokerThrottle()

The following methods will be added to AdminClient to support changing the inter-broker transfer throttle.

/**
 * Alter the throttle of inter-broker replication for the given broker to the given rate.
 * @param broker The broker
 * @param leaderRateBytesPerSecond The approximate maximum transfer rate, in bytes per second, 
 *                                 to be used for inter-broker replication when the broker is the leader.
 * @param followerRateBytesPerSecond The approximate maximum transfer rate, in bytes per second, 
 *                                 to be used for inter-broker replication when the broker is the follower.
 */
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond)
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond, AlterInterBrokerThrottleOptions options)

Where:

public class AlterInterBrokerThrottleOptions {
    public Integer timeoutMs() { ... }
    public AlterInterBrokerThrottleOptions timeoutMs(Integer timeoutMs) { ... }
}

public class AlterInterBrokerThrottleResult {
    public KafkaFuture<Void> all();
}

Notes:

  • The throttled rate is implemented as a DynamicConfig on the broker, but this is an implementation detail so it wouldn't be appropriate to set it via alterConfigs(). In any case it's currently not possible to change broker configs via alterConfigs().
  • The throttle is broker-specific and different brokers can have different rates applied.

Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse

The AlterPartitionCountsRequest is used to change the partition count for a batch of topics, and is the basis for the  AdminClient.alterPartitionCounts() method.

AlterPartitionCountsRequest => [topic_partition_count] timeout
  topic_partition_count => topic partition_count
    topic => STRING
    partition_count => INT32
  timeout => INT32

Where

FieldDescription
topicthe name of a topic
partition_countthe new partition count
timeout

The maximum time to await a response in ms.

The request will require the ALTER operation on the Topic resource.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

The response provides an error code and message for each of the topics present in the request.

AlterPartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
  topic_partition_count_error => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topicthe name of a topic in the request
error_codean error code for that topic
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • TOPIC_AUTHORIZATION_FAILED (29) The user lacked Alter on the topic
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the num_partitions was invalid
  • INVALID_REQUEST (42) If duplicate topics appeared in the request.
  • NONE (0) The topic partition count was changed successfully.

Network Protocol: AlterReplicationFactorsRequest and AlterReplicationFactorsResponse

The AlterReplicationFactorRequest is used to change the replication factor for a batch of topics, and is the basis for the AdminClient.alterReplicationFactors() method.

AlterReplicationFactorsRequest => [topic_replication_factor] timeout
  topic_replication_factor => topic replication_factor
    topic => STRING
    replication_factor => INT16
  timeout => INT32
  // TODO: validate_only?

Where

FieldDescription
topictopic name
replication_factorthe new replication factor for this topic
timeoutThe maximum time to await a response in ms.

The request will require the ClusterAction operation on the CLUSTER resource, since it can require significant inter-broker communication.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

AlterReplicationFactorResponse => throttle_time_ms [topic_replication_factor_error]
  topic_replication_factor_error => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topicthe name of a topic in the request
error_codean error code for that topic
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_REPLICATION_FACTOR (38) If the replication_factor was invalid
  • INVALID_REQUEST (42) If duplicate topics appeared in the request.
  • NONE (0) The topic replication factor was changed successfully.

Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse

ReassignPartitionsRequest initiates the movement of replicas between brokers, and is the basis of the AdminClient.reassignPartitions() method

ReassignPartitionsRequest => [topic_reassignments] timeout validate_only
  topic_reassignments => topic [partition_reassignments]
    topic => STRING
    partition_reassignments => partition_id [broker]
      partition_id => INT32
      broker => INT32
  timeout => INT32
  validate_only => BOOLEAN

Where

FieldDescription
topicthe name of a topic
partition_ida partition of that topic
brokera broker id
timeoutThe maximum time to await a response in ms.
validate_onlywhen true: validate the request, but don't actually reassign the partitions

The request  requires the ClusterAction operation on the CLUSTER resource, since it can require significant inter-broker communication.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

ReassignPartitionsResponse describes which partitions in the request will be moved, and what was wrong with the request for those partitions which will not be moved.

ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
  throttle_time_ms => INT32
  reassign_partition_result => topic partition_id error_code error_message
    topic => STRING
    partition_id => INT32
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topica topic name from the request
partition_ida partition id for that topic, from the request
error_codean error code for that topic partition
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_assignment included an unknown broker id
  • INVALID_REQUEST (42) If duplicate topics appeared in the request
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) reassignment has started

Network Protocol: AlterInterBrokerThrottleRequest and AlterInterBrokerThrottleResponse

An AlterInterBrokerThrottleRequest changes the leader and follower throttle rates for inter-broker tranfers from/to the broker receiving the request.

AlterInterBrokerThrottleRequest => leader_rate follower_rate timeout
  leader_rate => INT64
  follower_rate => INT64
  timeout => INT32

Where:

FieldDescription
leader_rateThe approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the leader
follower_rateThe approximate maximum rate, in byte/sec, for interbroker transfers where the receiving broker is the follower
timeoutThe maximum time to await a response in ms.

The request  requires the ClusterAction operation on the CLUSTER resource, since it can affect significant inter-broker communication.

An AlterInterBrokerThrottleResponse just acknowledges the success or otherwise of a AlterInterBrokerThrottleRequest

AlterInterBrokerThrottleResponse => error_code error_message throttle_time_ms
  error_code => INT16
  error_message => NULLABLE_STRING
  throttle_time_ms => INT32

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
error_codean error code for the request
error_messagemore detailed information about any error

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_REQUEST (42) Leader rate or follower rate was out of range
  • NONE (0) the request was successful

Summary of use cases

Use casecommandAdminClient
Increase partition countkafka-topics --alter --topic T --partitions PalterPartitionCount()
Change replication factorkafka-topics --alter --topic T --replication-factor FalterReplicationFactors()
Change partition assignmentkafka-topics --alter --topic T --replica-assignment AreassignPartitions()
Change partition assignmentkafka-reassign-partitions --execute --reassignment-json-file JreassignPartitions()
Change partition assignment with throttlekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

reassignPartitions(validateOnly) // check none in progress

alterInterBrokerThrottle()

alterConfigs() // (leader|follower).replication.throttled.replicas

reassignPartitions()

Change throttled ratekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

alterInterBrokerThrottle()

Check progress of a reassignmentkafka-reassign-partitions --progress --reassignment-json-file JdescribeReplicaDir() (see KIP-113)
Check result and clear throttlekafka-reassign-partitions --verify --reassignment-json-file J

reassignPartitions(validateOnly) // check none in progress

alterConfigs() // (leader|follower).replication.throttled.replicas

 

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Existing users of the kafka-reassign-partitions.sh will receive a deprecation warning when they use the --zookeeper option. The option will be removed in a future version of Kafka. If this KIP is introduced in version 1.0.0 the removal could happen in 2.0.0.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

One alternative is to do nothing: Let the ReassignPartitionsCommand continue to communicate with ZooKeeper directly.

Another alternative is to do exactly this KIP, but without the deprecation of --zookeeper. That would have a higher long term maintenance burden, and would prevent any future plans to, for example, provide alternative cluster technologies than ZooKeeper.

An alterTopics() AdminClient API, mirroring the existing createTopics() API, was considered, but:

  • Some calls to alterTopics() (such as increasing the partition count) would have been synchronous, while others (such as moving replicas between brokers) would have been long running and thus asynchronous. This made for an API which synchronousness depended on the arguments.
  • createTopics() allows to specify topic configs, whereas alterConfigs() is already provided to change topic configs, so it wasn't an exact mirror

Just providing reassignPartitions() was considered, with changes to partition count and replication factor inferred from partitions and brokers present in the assignments argument. This would require the caller to provide an assignment of partitions to brokers, but the AdminClient doesn't provide methods to obtain the necessary information to make an informed decision (what it the load on the brokers, how much free disk space do they have, or how many partitions are they already hosting), so it was felt better to retain alterPartitionCounts() and alterReplicationFactors() for the time being.

 

  • No labels