...
AdminClient: alterPartitionCount()
Anchor | ||||
---|---|---|---|---|
|
AdminClient
to support changing topics' partition counts...
AdminClient: alterReplicationCount()
Anchor | ||||
---|---|---|---|---|
|
AdminClient
to support changing topics' replication factors....
AdminClient: reassignPartitions()
Anchor | ||||
---|---|---|---|---|
|
AdminClient
to support changing the brokers hosting the partitions of a topic...
Partition reassignment is a long running operation, and the ReassignPartitionsResult
indicates only that the reassignment has been started, not that the reassignment has been completed.
TODO quotas
AdminClient: replicaStatus()
...
Code Block |
---|
public class ReplicaStatusOptions { } public class ReplicaStatusResult { public KafkaFuture<Map<TopicPartition, List<ReplicaStatus>>> all() } /** * Represents the replication status of a partition * on a particular broker. */ public class ReplicaStatus { /** The topic about which this is the status of */ String topic() /** The partition about which this is the status of */ int partition() /** The broker about which this is the status of */ int broker() /** * The time (as milliseconds since the epoch) that * this status data was collected. In general this may * be some time before the replicaStatus() request time. */ public long statusTime() /** * The number of messages that the replica on this broker is behind * the leader. */ public long lag() } |
Authorization
With broker-mediated reassignment it becomes possible limit the authority to perform reassignment to something finer-grained than "anyone with access to zookeeper".
The reasons for reassignment are usually operational. For example, migrating partitions to new brokers when expanding the cluster, or attempting to find a more balanced assignment (according to some notion of balance). These are cluster-wide considerations and so authority should be for the reassign operation being performed on the cluster. Therefore alterTopics()
will require ClusterAction
on the CLUSTER
.
replicaStatus()
will require Describe
on the CLUSTER
.
Network Protocol: AlterTopicsRequest
and AlterTopicsResponse
...
No Format |
---|
AlterTopicsRequest => [alter_topic_requests] validate_only
alter_topic_requests => topic num_partitions replication_factor [partition_assignment]
topic => STRING
num_partitions => INT32
replication_factor => INT16
partition_assignment => partition_id brokers
partition_id => INT32
brokers => [INT32]
validate_only => BOOLEAN
timeout => INT32 |
Where
FIELD | DESCRIPTION |
---|---|
topic | the topic name |
num_partition | the number of partitions. A |
replication_factor | the replication factor. A |
partition_id | the partition id |
brokers | the ids of the assigned brokers for this partition |
validate_only | true to just validate the request, but not actually alter the topics |
timeout | the timeout, in ms, to wait for the topic to be altered. |
An empty partition_assignment
would mean that the broker should calculate a suitable assignment. Such broker calculated assignment is unlikely to be balanced.
It is not necessary to send an AlterTopicsRequest
to the leader for a given partition. Any broker will do.
...
Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse
Anchor | ||||
---|---|---|---|---|
|
No Format |
---|
AlterPartitionCountsRequest => [topic_partition_count]
topic_partition_count => topic partition_count
topic => STRING
partition_count => INT32
// TODO: validate_only and/or timeout flags? |
Where
Field | Description |
---|---|
topic | the name of a topic |
partition_count | the new partition count |
The request will require the ALTER
operation on the Topic
resource.
The request is subject to the CreateTopicPolicy
of the broker as configured by the create.topic.policy.class.name
config property
Anchor | ||||
---|---|---|---|---|
|
No Format |
---|
AlterPartitionCountsResponse => throttle_time_ms [topic_partition_count_error]
topic_partition_count_error => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the name of a topic in the request |
error_code | an error code for that topic |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_PARTITIONS
(37) If the num_partitions was invalidINVALID_REQUEST
(42) If duplicate topics appeared in the request.NONE
(0) The topic partition count was changed successfully.
Network Protocol: AlterReplicationFactorsRequest and AlterReplicationFactorsResponse
Anchor | ||||
---|---|---|---|---|
|
No Format |
---|
AlterReplicationFactorsRequest => [topic_replication_factor]
topic_replication_factor => topic replication_factor
topic => STRING
replication_factor => INT16
// TODO: validate_only and/or timeout flags? |
Where
Field | Description |
---|---|
topic | topic name |
replication_factor | the new replication factor for this topic |
The request will require the ClusterAction
operation on the CLUSTER
resource, since it can require significant inter-broker communication.
The request is subject to the CreateTopicPolicy
of the broker as configured by the create.topic.policy.class.name
config property
Anchor | ||||
---|---|---|---|---|
|
No Format |
---|
AlterReplicationFactorResponse => throttle_time_ms [topic_replication_factor_error]
topic_replication_factor_error => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the name of a topic in the request |
error_code | an error code for that topic |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_REPLICATION_FACTOR
(38) If the replication_factor was invalidINVALID_REQUEST
(42) If duplicate topics appeared in the request.NONE
(0) The topic replication factor was changed successfully.
Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse
Anchor | ||||
---|---|---|---|---|
|
ReassignPartitionsRequest
initiates the movement of replicas between brokersNo Format |
---|
ReassignPartitionsRequest => [reassigned_topic]
reassigned_topic => topic [reassigned_partition]
topic => STRING
reassigned_partition => partitiod_id [broker]
partition_id => INT32
broker => INT32
validate_only => BOOLEAN |
Where
Field | Description |
---|---|
topic | the name of a topic |
partition_id | a partition of that topic |
broker | a broker id |
validate_only | when true: validate the request, but don't actually reassign the partitions |
The request requires the ClusterAction
operation on the CLUSTER
resource, since it can require significant inter-broker communication.
The request is subject to the CreateTopicPolicy
of the broker as configured by the create.topic.policy.class.name
config property
Anchor | ||||
---|---|---|---|---|
|
ReassignPartitionsResponse
describes which partitions in the request will be moved, and what was wrong with the request for those partitions which will not be moved.No Format |
---|
ReassignPartitionsResponse => throttle_time_ms [reassign_partition_result]
throttle_time_ms => INT32
reassign_partition_result => topic partition_id error_code error_message
topic => STRING
partition_id => INT32
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | a topic name from the request |
partition_id | a partition id for that topic, from the request |
error_code | an error code for that topic partition |
error_message | more detailed information about any error for that topic |
Anticipated errors:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existUNKNOWN_MEMBER_ID
(25) If any broker ids in the partition_assignment included an unknown broker idINVALID_REQUEST
(42) If duplicate topics appeared in the requestPARTITION_REASSIGNMENT_IN_PROGRESS
(new)INVALID_REPLICA_ASSIGNMENT
(39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.NONE
(0) reassignment has started
No Format |
---|
AlterTopicsResponse => throttle_time_ms [topic_errors]
throttle_time_ms => INT32
topic_errors => topic error_code error_message
topic => STRING
error_code => INT16
error_message => NULLABLE_STRING |
Where
Field | Description |
---|---|
throttle_time_ms | duration in milliseconds for which the request was throttled |
topic | the topic name |
error_code | the error code for altering this topic |
error_message | detailed error information |
Possible values for error_code
:
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_PARTITIONS
(37) If then
um_partitions
was invalidINVALID_REPLICATION_FACTOR
(38) If ther
eplication_factor
was invalidUNKNOWN_MEMBER_ID
(25) If any broker ids in thep
artition_assignment
included an unknown broker idINVALID_REQUEST
(42) If trying to modify the partition assignment and the number of partitions or the partition assignment and the replication factor in the same request. Or if duplicatetopic
s appeared in the request.PARTITION_REASSIGNMENT_IN_PROGRESS
(new)INVALID_REPLICA_ASSIGNMENT
(39) If a partition, replica or broker id in thepartition_assignment
doesn't exist or is incompatible with the requestednum_partitions
and /orreplication_factor
. Theerror_message
would contain further information.NONE
(0) If the request was successful and the alteration/reassignment has been started.
As currently, it will not be possible to have multiple reassignments running concurrently, hence the addition of the PARTITION_REASSIGNMENT_IN_PROGRESS
error code.
Policy
The existing CreateTopicPolicy
can be used to apply a cluster-wide policy on topic configuration at the point of creation via the create.topic.policy.class.name
config property. To avoid an obvious loophole, it is necessary to also be able to apply a policy to topic alteration. Maintaining two separate policies in sync is a burden both in terms of class implementation and configuring the policy. It seems unlikely that many use cases would require a different policy for alteration than creation. On the other hand, just applying the CreateTopicPolicy
to alterations is undesirable because:
- Its name doesn't convey that it would be applied to alterations too
- Its API (specifically its
RequestMetadata
member class) includes topicconfigs
(i.e.Map<String, String>
) which is not part of the API for topic alteration even though it is part of the API for topic creation. - It prevents any use cases which legitimately did need to apply a different policy for alteration than creation.
Finding a balance between compatibility with existing deployments, and not opening the loophole is difficult.
The existing create.topic.policy.class.name
config would continue to work, and would continue to name an implementation of CreateTopicPolicy
. That policy would be applied to alterations automatically. The topic's config would be presented to the validate()
method (via the RequestMetadata
) even though it's not actually part of the AlterTopicsRequest
. The documentation for the interface and config property would be updated.
Network Protocol: ReplicaStatusRequest
and ReplicaStatusResponse
Anchor | ||||
---|---|---|---|---|
|
ReplicaStatusRequest
requests information about the progress of a number of replicas. It is not tied to a prior AlterReplicationFactorsRequest
or ReassignPartitionsRequest
, but is intended as a way of monitoring the progress and completion of those operations.No Format |
---|
ReplicaStatusRequest => [replica_status_requests] replica_status_requests => topic partition_id broker topic => STRING partition_id => INT32 broker => INT32 |
...
Field | Description |
---|---|
topic | a topic name |
partition_id | a partition id of this topic |
broker | a follower broker id for this partition |
The request will require the DESCRIBE
operation on each of the Topic
resources.
Anchor | ||||
---|---|---|---|---|
|
...