Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Throttle, describeReplicaDir

 

Note this was initially erroneously assigned as KIP-178, which was already taken, and has been reassigned KIP-179.

...

The AdminClient API will have new methods added (plus overloads for options):

The options accepted by kafka-reassign-partitions.sh command will change:

...

No Format
Topic      Partition  Broker  Status
-------------------------------------
my_topic   0          0       In sync
my_topic   0          1       Behind: 10456 messages behind
asdf       0          1       Unknown topic
my_topic   42         1       Unknown partition
my_topic   0          42      Unknown broker
my_topic   1          0       Broker does not host this partition

The implementation of --progress will make use of the describeReplicaDir() method from KIP-113 to find the lag of the syncing replica.

Internally, the ReassignPartitionsCommand will be refactored to support the above changes to the options. An interface will abstract the commands currently issued directly to zookeeper.

...

Code Block
languagejava
linenumberstrue
/**
 * Change<p>Change the partition count of the topics given as the keys of {@code counts}
 * to the corresponding values. Currently it is only possible to increase 
 * the partition count. </p>
 */
public AlterPartitionCountsResult alterPartitionCounts(Map<String, Integer> counts,
                    * <p>The replicas of new partitions will be allocated to the least loaded broker, 
 * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p>
 */
public AlterPartitionCountsResult alterPartitionCounts(Map<String, Integer> counts,
                    AlterPartitionCountsOptions options)
public AlterPartitionCountsResult alterPartitionCounts(Map<String, Integer> counts) 

...

Code Block
languagejava
linenumberstrue
public class AlterPartitionCountsOptions {
    public AlterPartitionCountsOptions() { ... }
    // TODO validateOnly?
    public Integer timeoutMs() { ... }
    public AlterPartitionCountsOptions timeoutMs(Integer timeoutMs) { ... }
}
 
public class AlterPartitionCountsResult {
    // package access constructor
    Map<String, KafkaFuture<Void>> values() { ... }
    KafkaFuture<Void> all() { ... }
}

AdminClient:

...

alterReplicationFactors()

Anchor
alterReplicationCount
alterReplicationCount
To support kafka-topics.sh --alter --replication-factor ... the following methods will be added to AdminClient to support changing topics' replication factors.

Code Block
languagejava
linenumberstrue
/**
 * Change<p>Change the replication factor of the topics given as the keys of 
 * replicationFactors to the corresponding values.</p>
 *
 * <p>New replicas will be allocated to the least loaded broker, 
 * but may subsequently be moved using {@link #reassignPartitions(Map)}.</p>
 */
AlterReplicationFactorsResult alterReplicationFactors(Map<String, Short> replicationFactors)
AlterReplicationFactorsResult alterReplicationFactors(Map<String, Short> replicationFactors, 
                        AlterReplicationFactorsOptions options)

...

Code Block
languagejava
linenumberstrue
public class AlterReplicationFactorsOptions {
    public AlterReplicationFactorsOptions() { ... }
    // TODO validateOnly?
    public Integer timeoutMs() { ... }
    public AlterReplicationFactorsOptions timeoutMs(Integer timeoutMs) { ... }
}
public class AlterReplicationFactorsResult {
    // package access constructor
    Map<String, KafkaFuture<Void>> values() { ... }
    KafkaFuture<Void> all() { ... }
}

 

AdminClient: reassignPartitions()

...

Code Block
languagejava
linenumberstrue
/**
 * Reassign<p>Reassign the partitions given as the key of the given <code>assignments</code> to the corresponding 
 * list of brokers.
 */
ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments)
ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments, 
                        ReassignPartitionsOptions options)

Where:

Code Block
languagejava
linenumberstrue
class ReassignPartitionsOptions {
    boolean validateOnly()
    ReassignPartitionsOptions validateOnly(boolean validateOnly)
    long timeoutMs()
    ReassignPartitionsOptions timeoutMs(long timeoutMs)
}
class ReassignPartitionsResult {
    Map<String, KafkaFuture<Void>> values();
}

Partition reassignment is a long running operation, and the ReassignPartitionsResult indicates only that the reassignment has been started, not that the reassignment has been completed. The new replicaStatus() method can be used to check progress and completion of the reassignment.

TODO quotas

AdminClient: replicaStatus()

...

 The first broker in each list is the one which holds the "preferred replica".</p>
 *
 * <p>Inter-broker reassignment causes significant inter-broker traffic and can take a long time 
 * in order to copy the replica data to brokers. It may be necessary to impose a quota on 
 * inter-broker traffic for the duration of the reassignment so that client-broker traffic is not
 * adversely affected.</p>
 *
 * <h3>Preferred replica</h3>
 * <p>When brokers are configured with <code>auto.leader.rebalance.enable=true</code>, the broker
 * with the preferred replica will be elected leader automatically. 
 * <code>kafka-preferred-replica-election.sh</code> provides a manual trigger for this 
 * election when <code>auto.leader.rebalance.enable=false</code>.</p>
 */
ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments)
ReassignPartitionsResult reassignPartitions(Map<TopicPartition, List<Integer>> assignments, 
                        ReassignPartitionsOptions options)

Where:

Code Block
languagejava
linenumberstrue
class ReassignPartitionsOptions {

    boolean validateOnly()

    /**
     * Validate the request only: Do not actually trigger replica reassignment.
     */
    ReassignPartitionsOptions validateOnly(boolean validateOnly)

    long timeoutMs()

    /**
     * Set a timeout for the starting of the reassignment. 
     * Note this timeout does not include the time take to actually
     * move replicas between brokers.
     */
    ReassignPartitionsOptions timeoutMs(long timeoutMs)

    long throttle()

    /**
Code Block
/**
 * Query the replication status of the given partitions. 
 */
public ReplicaStatusResult replicaStatus(Collection<TopicPartitionReplica> replicas)   
public ReplicaStatusResult replicaStatus(Collection<TopicPartitionReplica> replicas,  ReplicaStatusOptions options)

Where:

Code Block
/**
 * Identifies a replica of a topic partition on a broker
 */
public class TopicPartitionReplica {
    public TopicPartitionReplica(String topic, int partition, short broker) { ... }
    public String getTopic() { ... }
    public int getPartition() { ... }
    public short getBroker() { ... }
}

public class ReplicaStatusOptions {
    public ReplicaStatusOptions() { ... }
    public long timeoutMs() { ... }
    public ReplicaStatusOptions timeoutMs(long timeoutMs) { ... }
}

public class ReplicaStatusResult {
    public KafkaFuture<Map<TopicPartitionReplica, List<ReplicaStatus>>> all()
    public Map<TopicPartitionReplica, KafkaFuture<ReplicaStatus>> values()
}

/** 
 * Represents the replication status of a partition 
 * on a particular broker.
 */ 
public class ReplicaStatus {
    
    /** 
     * TheSet time (as milliseconds sincea throttle, in bytes per second, on the epoch) thatbandwidth used for 
     * thisinter-broker status data was collected. In general this mayreplica movement.
     */
 be some time beforeReassignPartitionsOptions the replicaStatus() request time.throttle(long throttledRateBytesPerSecond)

 }
class ReassignPartitionsResult {
     */
    public long statusTime()
    
    /** 
     * The number of messages that the replica on this broker is behind
     * the leader.
     */
    public long lag()
    
}

 

Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse

...

Map<String, KafkaFuture<Void>> values();
}

Partition reassignment is a long running operation, and the ReassignPartitionsResult indicates only that the reassignment has been started, not that the reassignment has been completed. The describeReplicaDir() method from KIP-113 can be used to determine progress.

AdminClient: alterInterBrokerThrottle()

The following methods will be added to AdminClient to support changing the inter-broker transfer throttle.

Code Block
/**
 * Alter the throttle of inter-broker replication for the given broker to the given rate.
 * @param broker The broker
 * @param leaderRateBytesPerSecond The approximate maximum transfer rate, in bytes per second, 
 *                                 to be used for inter-broker replication when the broker is the leader.
 * @param followerRateBytesPerSecond The approximate maximum transfer rate, in bytes per second, 
 *                                 to be used for inter-broker replication when the broker is the follower.
 */
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond)
AlterInterBrokerThrottleResult alterInterBrokerThrottleRate(int broker, long leaderRateBytesPerSecond, long followerRateBytesPerSecond, AlterInterBrokerThrottleOptions options)

Where:

Code Block
public class AlterInterBrokerThrottleOptions {
    public Integer timeoutMs() { ... }
    public AlterInterBrokerThrottleOptions timeoutMs(Integer timeoutMs) { ... }
}

public class AlterInterBrokerThrottleResult {
    public KafkaFuture<Void> all();
}
  • The throttled rate is implemented as a DynamicConfig on the broker, but this is an implementation detail so it wouldn't be appropriate to set it via alterConfigs()
  • The throttle is broker-specific and different brokers can have different rates applied.

Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse

Anchor
AlterPartitionCountsRequest
AlterPartitionCountsRequest
The AlterPartitionCountsRequest is used to change the partition count for a batch of topics, and is the basis for the  AdminClient.alterPartitionCounts() method.

No Format
AlterPartitionCountsRequest => [topic_partition_count] timeout
  topic_partition_count => topic partition_count
    topic => STRING
    partition_count => INT32
  timeout => INT32

Where

FieldDescription
topicthe name of a topic
partition_countthe new partition count
timeout

The maximum time to await a response in ms.

The request will require the ALTER operation on the Topic resource.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

Anchor
AlterPartitionCountsResponse
AlterPartitionCountsResponse
The response provides an error code and message for each of the topics present in the request.

No Format
AlterPartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
  topic_partition_count_error => topic error_code error_message
No Format
AlterPartitionCountsRequest => [topic_partition_count] timeout
  topic_partition_count => topic partition_count
    topic => STRING
    partition_count => INT32
  timeout => INT32
  // TODO: validate_only?

Where

FieldDescription
topicthe name of a topic
partition_countthe new partition count
timeout

The maximum time to await a response in ms.

The request will require the ALTER operation on the Topic resource.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

...

No Format
AlterPartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
  topic_partition_count_error => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topicthe name of a topic in the request
error_codean error code for that topic
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • TOPIC_AUTHORIZATION_FAILED (29) The user lacked Alter on the topic
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the num_partitions was invalid
  • INVALID_REQUEST (42) If duplicate topics appeared in the request.
  • NONE (0) The topic partition count was changed successfully.

Network Protocol: AlterReplicationFactorsRequest and AlterReplicationFactorsResponse

...

No Format
AlterReplicationFactorsRequest => [topic_replication_factor] timeout
  topic_replication_factor => topic replication_factor
    topic => STRING
    replicationerror_factorcode => INT16
  timeout  error_message => INT32
  // TODO: validate_only?

Where

FieldDescription
topictopic name
replication_factorthe new replication factor for this topic
timeoutThe maximum time to await a response in ms.

The request will require the ClusterAction operation on the CLUSTER resource, since it can require significant inter-broker communication.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

...

NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topicthe name of a topic in the request
error_codean error code for that topic
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • TOPIC_AUTHORIZATION_FAILED (29) The user lacked Alter on the topic
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the num_partitions was invalid
  • INVALID_REQUEST (42) If duplicate topics appeared in the request.
  • NONE (0) The topic partition count was changed successfully.

Network Protocol: AlterReplicationFactorsRequest and AlterReplicationFactorsResponse

Anchor
AlterReplicationFactorsRequest
AlterReplicationFactorsRequest
The AlterReplicationFactorRequest is used to change the replication factor for a batch of topics, and is the basis for the AdminClient.alterReplicationFactors() method.

No Format
AlterReplicationFactorsRequest => [topic_replication_factor] timeout

...

No Format
AlterReplicationFactorResponse => throttle_time_ms [topic_replication_factor_error]
  topic_replication_factor_error => topic error_code error_messagereplication_factor
    topic => STRING
    errorreplication_codefactor => INT16
  timeout  error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topicthe name of a topic in the request
error_codean error code for that topic
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_REPLICATION_FACTOR (38) If the replication_factor was invalid
  • INVALID_REQUEST (42) If duplicate topics appeared in the request.
  • NONE (0) The topic replication factor was changed successfully.

Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse

...

No Format
ReassignPartitionsRequest => [reassigned_topic] timeout validate_only
  reassigned_topic => topic [reassigned_partition]
    topic => STRING
    reassigned_partition => partition_id [broker]
      partition_id => INT32
      broker => INT32
  timeout => INT32
  validate_only => BOOLEAN

Where

FieldDescription
topicthe name of a topic
partition_ida partition of that topic
brokera broker id
timeoutThe maximum time to await a response in ms.
validate_onlywhen true: validate the request, but don't actually reassign the partitions

The request  requires the ClusterAction operation on the CLUSTER resource, since it can require significant inter-broker communication.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

...

=> INT32
  // TODO: validate_only?

Where

FieldDescription
topictopic name
replication_factorthe new replication factor for this topic
timeoutThe maximum time to await a response in ms.

The request will require the ClusterAction operation on the CLUSTER resource, since it can require significant inter-broker communication.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

Anchor
AlterReplicationFactorResponse
AlterReplicationFactorResponse

No Format
AlterReplicationFactorResponse => throttle_time_ms [topic_replication_factor_error]
  topic_replication_factor_error => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topicthe name of a topic in the request
error_codean error code for that topic
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_REPLICATION_FACTOR (38) If the replication_factor was invalid
  • INVALID_REQUEST (42) If duplicate topics appeared in the request.
  • NONE (0) The topic replication factor was changed successfully.

Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse

Anchor
ReassignPartitionsRequest
ReassignPartitionsRequest
ReassignPartitionsRequest initiates the movement of replicas between brokers, and is the basis of the AdminClient.reassignPartitions() method

No Format
ReassignPartitionsRequest => [reassigned_topic] timeout validate_only
  reassigned_topic => topic [reassigned_partition]
    topic => STRING
    reassigned_partition => partition_id [broker]
      partition_id => INT32
      broker => INT32
  timeout => INT32
  validate_only => BOOLEAN

Where

FieldDescription
topicthe name of a topic
partition_ida partition of that topic
brokera broker id
timeoutThe maximum time to await a response in ms.
validate_onlywhen true: validate the request, but don't actually reassign the partitions

The request  requires the ClusterAction operation on the CLUSTER resource, since it can require significant inter-broker communication.

The request is subject to the CreateTopicPolicy of the broker as configured by the broker's create.topic.policy.class.name config property. This is to ensure that the policy applies to topics modified after creation.

Anchor
ReassignPartitionsResponse
ReassignPartitionsResponse
ReassignPartitionsResponse describes which partitions in the request will be moved, and what was wrong with the request for those partitions which will not be moved.

No Format
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
  throttle_time_ms => INT32
  reassign_partition_result => topic partition_id error_code error_message
    topic => STRING
    partition_id => INT32
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topica topic name from the request
partition_ida partition id for that topic, from the request
error_codean error code for that topic partition
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_assignment included an unknown broker id
  • INVALID_REQUEST (42) If duplicate topics appeared in the request
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) reassignment has started

Summary of user cases

Use casecommandAdminClient
Increase partition countkafka-topics --alter --topic T --partitions PalterPartitionCount()
Change replication factorkafka-topics --alter --topic T --replication-factor FalterReplicationFactors()
Change partition assignmentkafka-topics --alter --topic T --replica-assignment AreassignPartitions()
Change partition assignmentkafka-reassign-partitions --execute --reassignment-json-file JreassignPartitions()
Change partition assignment with throttlekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

reassignPartitions(validateOnly) // check none in progress

alterInterBrokerThrottle()

alterConfigs() // (leader|follower).replication.throttled.replicas

reassignPartitions()

Change throttled ratekafka-reassign-partitions --execute --reassignment-json-file J --throttle R

alterInterBrokerThrottle()

Check progress of a reassignmentkafka-reassign-partitions --progress --reassignment-json-file JdescribeReplicaDir() (see KIP-113)
Check result and clear throttlekafka-reassign-partitions --verify --reassignment-json-file J

reassignPartitions(validateOnly) // check none in progress

alterConfigs() // (leader|follower).replication.throttled.replicas

 

No Format
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
  throttle_time_ms => INT32
  reassign_partition_result => topic partition_id error_code error_message
    topic => STRING
    partition_id => INT32
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topica topic name from the request
partition_ida partition id for that topic, from the request
error_codean error code for that topic partition
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_assignment included an unknown broker id
  • INVALID_REQUEST (42) If duplicate topics appeared in the request
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) reassignment has started

Network Protocol: ReplicaStatusRequest and ReplicaStatusResponse

...

No Format
ReplicaStatusRequest => [replica_status_requests]
  replica_status_requests => topic partition_id broker
    topic => STRING
    partition_id => INT32
    broker => INT32

Where

FieldDescription
topic

a topic name

partition_id

a partition id of this topic

broker

a follower broker id for this partition

The request will require the DESCRIBE operation on each of the Topic resources.

...

No Format
ReplicaStatusResponse => [replica_status]
  replica_status => topic partition_id broker error_code status_time lag
    topic => STRING
    partition_id => INT32
    broker => INT32
    error_code => INT16
    status_time => INT64
    lag => INT64

Where

FieldDescription
topic

the topic name

partition_id

the partition id of this topic

broker

the follower broker id

error_code

an error code

status_time

the time the status was current

lag

the lag (#messages) of this broker, for this partition

Anticipated errors are:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed. (or the TOPIC?)
  • INVALID_TOPIC_EXCEPTION (17) The topic is not known
  • INVALID_PARTITIONS (37)  The partion_id of the given topic is not valid
  • UNKNOWN_MEMBER_ID (25) The given broker id is not known.
  • UNKNOWN_TOPIC_OR_PARTITION (3) The given broker is not a follower for the partition identified by topic, partition.
  • NONE (0) if the status request completed normally,

Implementation

The AdminClient.replicaStatus() will make the underlying ReplicaStatusRequest to the leader for the given partition. This saves the need for every broker (because any broker could be the --bootstrap-server ) to have knowledge of the replication status of every replica, which would be inefficient in network IO and/or memory use.

Compatibility, Deprecation, and Migration Plan

...