Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

AdminClient: alterPartitionCount()

Anchor
alterPartitionCount
alterPartitionCount
The following methods will be added to AdminClient to support changing topics' partition counts

...

AdminClient: alterReplicationCount()

Anchor
alterReplicationCount
alterReplicationCount
The following methods will be added to AdminClient to support changing topics' replication factors.

...

AdminClient: reassignPartitions()

Anchor
reassignPartitions
reassignPartitions
The following methods will be added to AdminClient to support changing the brokers hosting the partitions of a topic

...

Partition reassignment is a long running operation, and the ReassignPartitionsResult indicates only that the reassignment has been started, not that the reassignment has been completed.

TODO quotas

AdminClient: replicaStatus()

...

Code Block
public class ReplicaStatusOptions {
    
}

public class ReplicaStatusResult {
    public KafkaFuture<Map<TopicPartition, List<ReplicaStatus>>> all()
}

/** 
 * Represents the replication status of a partition 
 * on a particular broker.
 */ 
public class ReplicaStatus {
    /** The topic about which this is the status of */
    String topic()
    /** The partition about which this is the status of */
    int partition()
    /** The broker about which this is the status of */
    int broker()
    
    /** 
     * The time (as milliseconds since the epoch) that 
     * this status data was collected. In general this may
     * be some time before the replicaStatus() request time.
     */
    public long statusTime()
    
    /** 
     * The number of messages that the replica on this broker is behind
     * the leader.
     */
    public long lag()
    
}

Authorization

With broker-mediated reassignment it becomes possible limit the authority to perform reassignment to something finer-grained than "anyone with access to zookeeper".

The reasons for reassignment are usually operational. For example, migrating partitions to new brokers when expanding the cluster, or attempting to find a more balanced assignment (according to some notion of balance). These are cluster-wide considerations and so authority should be for the reassign operation being performed on the cluster. Therefore alterTopics() will require ClusterAction on the CLUSTER.

replicaStatus() will require Describe on the CLUSTER.

Network Protocol: AlterTopicsRequest and AlterTopicsResponse

...

No Format
AlterTopicsRequest => [alter_topic_requests] validate_only
  alter_topic_requests => topic num_partitions replication_factor [partition_assignment]
    topic => STRING
    num_partitions => INT32
    replication_factor => INT16
    partition_assignment => partition_id brokers
      partition_id => INT32
      brokers => [INT32]
  validate_only => BOOLEAN
  timeout => INT32

Where

FIELDDESCRIPTION
topic

the topic name

num_partition

the number of partitions. A num_partitions of -1 that would mean "no change"

replication_factor

the replication factor. A  replication_factor of -1 would mean "no change"

partition_id

the partition id

brokers

the ids of the assigned brokers for this partition

validate_only

true to just validate the request, but not actually alter the topics

timeoutthe timeout, in ms, to wait for the topic to be altered.

An empty partition_assignment would mean that the broker should calculate a suitable assignment. Such broker calculated assignment is unlikely to be balanced.

It is not necessary to send an AlterTopicsRequest to the leader for a given partition. Any broker will do.

...

 

Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse

Anchor
AlterPartitionCountsRequest
AlterPartitionCountsRequest

No Format
AlterPartitionCountsRequest => [topic_partition_count]
  topic_partition_count => topic partition_count
    topic => STRING
    partition_count => INT32
  // TODO: validate_only and/or timeout flags?

Where

FieldDescription
topicthe name of a topic
partition_countthe new partition count

The request will require the ALTER operation on the Topic resource.

The request is subject to the CreateTopicPolicy of the broker as configured by the create.topic.policy.class.name config property

Anchor
AlterPartitionCountsResponse
AlterPartitionCountsResponse

No Format
AlterPartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
  topic_partition_count_error => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topicthe name of a topic in the request
error_codean error code for that topic
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the num_partitions was invalid
  • INVALID_REQUEST (42) If duplicate topics appeared in the request.
  • NONE (0) The topic partition count was changed successfully.

Network Protocol: AlterReplicationFactorsRequest and AlterReplicationFactorsResponse

Anchor
AlterReplicationFactorsRequest
AlterReplicationFactorsRequest

No Format
AlterReplicationFactorsRequest => [topic_replication_factor]
  topic_replication_factor => topic replication_factor
    topic => STRING
    replication_factor => INT16
  // TODO: validate_only and/or timeout flags?

Where

FieldDescription
topictopic name
replication_factorthe new replication factor for this topic

The request will require the ClusterAction operation on the CLUSTER resource, since it can require significant inter-broker communication.

The request is subject to the CreateTopicPolicy of the broker as configured by the create.topic.policy.class.name config property

Anchor
AlterReplicationFactorResponse
AlterReplicationFactorResponse

No Format
AlterReplicationFactorResponse => throttle_time_ms [topic_replication_factor_error]
  topic_replication_factor_error => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topicthe name of a topic in the request
error_codean error code for that topic
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_REPLICATION_FACTOR (38) If the replication_factor was invalid
  • INVALID_REQUEST (42) If duplicate topics appeared in the request.
  • NONE (0) The topic replication factor was changed successfully.

Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse

Anchor
ReassignPartitionsRequest
ReassignPartitionsRequest
ReassignPartitionsRequest initiates the movement of replicas between brokers

No Format
ReassignPartitionsRequest => [reassigned_topic]
  reassigned_topic => topic [reassigned_partition]
    topic => STRING
    reassigned_partition => partitiod_id [broker]
      partition_id => INT32
      broker => INT32
  validate_only => BOOLEAN

Where

FieldDescription
topicthe name of a topic
partition_ida partition of that topic
brokera broker id
validate_onlywhen true: validate the request, but don't actually reassign the partitions

The request  requires the ClusterAction operation on the CLUSTER resource, since it can require significant inter-broker communication.

The request is subject to the CreateTopicPolicy of the broker as configured by the create.topic.policy.class.name config property

Anchor
ReassignPartitionsResponse
ReassignPartitionsResponse
ReassignPartitionsResponse describes which partitions in the request will be moved, and what was wrong with the request for those partitions which will not be moved.

No Format
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
  throttle_time_ms => INT32
  reassign_partition_result => topic partition_id error_code error_message
    topic => STRING
    partition_id => INT32
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_msduration in milliseconds for which the request was throttled
topica topic name from the request
partition_ida partition id for that topic, from the request
error_codean error code for that topic partition
error_messagemore detailed information about any error for that topic

Anticipated errors:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_assignment included an unknown broker id
  • INVALID_REQUEST (42) If duplicate topics appeared in the request
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) reassignment has started
No Format
AlterTopicsResponse => throttle_time_ms [topic_errors]
  throttle_time_ms => INT32
  topic_errors => topic error_code error_message
    topic => STRING
    error_code => INT16
    error_message => NULLABLE_STRING

Where

FieldDescription
throttle_time_ms

duration in milliseconds for which the request was throttled

topic

the topic name

error_code

the error code for altering this topic

error_message

detailed error information

Possible values for error_code:

  • CLUSTER_AUTHORIZATION_FAILED (31) Authorization failed
  • INVALID_TOPIC_EXCEPTION (17) If the topic doesn't exist
  • INVALID_PARTITIONS (37) If the num_partitions was invalid
  • INVALID_REPLICATION_FACTOR (38) If the replication_factor was invalid
  • UNKNOWN_MEMBER_ID (25) If any broker ids in the partition_assignment included an unknown broker id
  • INVALID_REQUEST (42) If trying to modify the partition assignment and the number of partitions or the partition assignment and the replication factor in the same request. Or if duplicate topics appeared in the request.
  • PARTITION_REASSIGNMENT_IN_PROGRESS (new)
  • INVALID_REPLICA_ASSIGNMENT (39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.
  • NONE (0) If the request was successful and the alteration/reassignment has been started.

As currently, it will not be possible to have multiple reassignments running concurrently, hence the addition of the PARTITION_REASSIGNMENT_IN_PROGRESS error code.

Policy

The existing CreateTopicPolicy can be used to apply a cluster-wide policy on topic configuration at the point of creation via the create.topic.policy.class.name config property. To avoid an obvious loophole, it is necessary to also be able to apply a policy to topic alteration. Maintaining two separate policies in sync is a burden both in terms of class implementation and configuring the policy. It seems unlikely that many use cases would require a different policy for alteration than creation. On the other hand, just applying the CreateTopicPolicy to alterations is undesirable because:

  • Its name doesn't convey that it would be applied to alterations too
  • Its API (specifically its RequestMetadata member class) includes topic configs (i.e. Map<String, String>) which is not part of the API for topic alteration even though it is part of the API for topic creation.
  • It prevents any use cases which legitimately did need to apply a different policy for alteration than creation.

Finding a balance between compatibility with existing deployments, and not opening the loophole is difficult.

The existing create.topic.policy.class.name config would continue to work, and would continue to name an implementation of CreateTopicPolicy. That policy would be applied to alterations automatically. The topic's config would be presented to the validate() method (via the RequestMetadata) even though it's not actually part of the AlterTopicsRequest. The documentation for the interface and config property would be updated.

Network Protocol: ReplicaStatusRequest and ReplicaStatusResponse

Anchor
ReplicaStatusRequest
ReplicaStatusRequest
ReplicaStatusRequest requests information about the progress of a number of replicas. It is not tied to a prior AlterReplicationFactorsRequest or ReassignPartitionsRequest, but is intended as a way of monitoring the progress and completion of those operations.

No Format
ReplicaStatusRequest => [replica_status_requests]
  replica_status_requests => topic partition_id broker
    topic => STRING
    partition_id => INT32
    broker => INT32

...

FieldDescription
topic

a topic name

partition_id

a partition id of this topic

broker

a follower broker id for this partition

 The request will require the DESCRIBE operation on each of the Topic resources.

Anchor
ReplicaStatusResponse
ReplicaStatusResponse
The response includes replication information for each of the replicas in the request:

...