...
Note this was initially erroneously assigned as KIP-178, which was already taken, and has been reassigned KIP-179.
Table of Contents |
---|
Status
Current state: Under Discussion [One of "Under Discussion", "Accepted", "Rejected"]
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Describe the problems you are trying to solve.
...
Secondly, ReassignPartitionsCommand
currently has no proper facility to report progress of a reassignment; --verify
can be used periodically to check whether the request assignments have been achieved. It would be useful if the tool could report progress better.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
...
New network protocol APIs will be added:
AlterPartitionCountsRequest
andAlterPartitionCountsResponse
AlterReplicationFactorsRequest
andAlterReplicationFactorsResponse
ReassignPartitionsRequest
andReassignPartitionsResponse
ReplicaStatusRequest and Copy of KIP-179 - Change ReassignPartitionsCommand to use AdminClient ReplicaStatusResponse
The AdminClient
API will have new methods added (plus overloads for options):
alterPartitionCounts(Map<String, Integer> partitionCounts)
alterReplicationFactors(Map<String, Short> replicationFactors)
reassignPartitions(Map<TopicPartition, List<Integer>>)
replicaStatus(Collection<Replica> replicas)
The options accepted by kafka-reassign-partitions.sh
command will change:
--zookeeper
will be deprecated, with a warning message- a new
--bootstrap-server
option will be added - a new
--progress
action option will be added
Proposed Changes
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
kafka-reassign-partitions.sh
and ReassignPartitionsCommand
The --zookeeper
option will be retained and will:
...
In all other respects, the public API of ReassignPartitionsCommand
will not be changed.
AdminClient: alterPartitionCount()
Anchor | ||||
---|---|---|---|---|
|
kafka-topics.sh --alter --partitions ...
the following methods will be added to AdminClient
to support changing topics' partition counts...
Code Block |
---|
class AlterPartitionCountsOptions { // TODO validateOnly? // TODO timeout? } class AlterPartitionCountsResult { Map<String, KafkaFuture<Void>> values(); } |
AdminClient: alterReplicationCount()
Anchor | ||||
---|---|---|---|---|
|
kafka-topics.sh --alter --replication-factor ...
the following methods will be added to AdminClient
to support changing topics' replication factors....
Code Block |
---|
class AlterReplicationFactorsOptions { // TODO validateOnly? // TODO timeout? } class AlterReplicationFactorsResult { Map<String, KafkaFuture<Void>> values(); } |
AdminClient: reassignPartitions()
Anchor | ||||
---|---|---|---|---|
|
kafka-topics.sh --alter --replica-assignment ...
and kafka-reassign-partitions.sh
the following methods will be added to AdminClient
to support changing the brokers hosting the partitions of a topic...
Partition reassignment is a long running operation, and the ReassignPartitionsResult
indicates only that the reassignment has been started, not that the reassignment has been completed. The new replicaStatus()
method can be used to check progress and completion of the reassignment.
TODO quotas
AdminClient: replicaStatus()
Anchor | ||||
---|---|---|---|---|
|
AdminClient
to support the progress reporting functionality:...
Code Block |
---|
public class ReplicaStatusOptions { } public class ReplicaStatusResult { public KafkaFuture<Map<TopicPartition, List<ReplicaStatus>>> all() } /** * Represents the replication status of a partition * on a particular broker. */ public class ReplicaStatus { /** The topic about which this is the status of */ String topic() /** The partition about which this is the status of */ int partition() /** The broker about which this is the status of */ int broker() /** * The time (as milliseconds since the epoch) that * this status data was collected. In general this may * be some time before the replicaStatus() request time. */ public long statusTime() /** * The number of messages that the replica on this broker is behind * the leader. */ public long lag() } |
Network Protocol: AlterPartitionCountsRequest and AlterPartitionCountsResponse
Anchor | ||||
---|---|---|---|---|
|
AlterPartitionCountsRequest
is used to change the partition count for a batch of topics, and is the basis for the AdminClient.alterPartitionCounts()
method.No Format |
---|
AlterPartitionCountsRequest => [topic_partition_count] topic_partition_count => topic partition_count topic => STRING partition_count => INT32 // TODO: validate_only and/or timeout flags? |
...
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_PARTITIONS
(37) If the num_partitions was invalidINVALID_REQUEST
(42) If duplicate topics appeared in the request.NONE
(0) The topic partition count was changed successfully.
Network Protocol: AlterReplicationFactorsRequest and AlterReplicationFactorsResponse
Anchor | ||||
---|---|---|---|---|
|
AlterReplicationFactorRequest
is used to change the replication factor for a batch of topics, and is the basis for the AdminClient.alterReplicationFactors() method.No Format |
---|
AlterReplicationFactorsRequest => [topic_replication_factor] topic_replication_factor => topic replication_factor topic => STRING replication_factor => INT16 // TODO: validate_only and/or timeout flags? |
...
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existINVALID_REPLICATION_FACTOR
(38) If the replication_factor was invalidINVALID_REQUEST
(42) If duplicate topics appeared in the request.NONE
(0) The topic replication factor was changed successfully.
Network Protocol: ReassignPartitionsRequest and ReassignPartitionsResponse
Anchor | ||||
---|---|---|---|---|
|
ReassignPartitionsRequest
initiates the movement of replicas between brokers, and is the basis of the AdminClient.reassignPartitions()
methodNo Format |
---|
ReassignPartitionsRequest => [reassigned_topic] reassigned_topic => topic [reassigned_partition] topic => STRING reassigned_partition => partitiod_id [broker] partition_id => INT32 broker => INT32 validate_only => BOOLEAN |
...
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failedINVALID_TOPIC_EXCEPTION
(17) If the topic doesn't existUNKNOWN_MEMBER_ID
(25) If any broker ids in the partition_assignment included an unknown broker idINVALID_REQUEST
(42) If duplicate topics appeared in the requestPARTITION_REASSIGNMENT_IN_PROGRESS
(new)INVALID_REPLICA_ASSIGNMENT
(39) If a partition, replica or broker id in the partition_assignment doesn't exist or is incompatible with the requested num_partitions and /or replication_factor. The error_message would contain further information.NONE
(0) reassignment has started
Network Protocol: ReplicaStatusRequest
and ReplicaStatusResponse
Anchor | ||||
---|---|---|---|---|
|
ReplicaStatusRequest
requests information about the progress of a number of replicas. It is not tied to a prior AlterReplicationFactorsRequest
or ReassignPartitionsRequest
, but is intended as a way of monitoring the progress and completion of those operations....
CLUSTER_AUTHORIZATION_FAILED
(31) Authorization failed. (or the TOPIC?)INVALID_TOPIC_EXCEPTION
(17) The topic is not knownINVALID_PARTITIONS
(37) Thepartion
_id
of the given topic is not validUNKNOWN_MEMBER_ID
(25) The givenbroker
id is not known.UNKNOWN_TOPIC_OR_PARTITION
(3) The givenbroker
is not a follower for the partition identified bytopic
,partition
.NONE
(0) if the status request completed normally,
Implementation
The AdminClient.replicaStatus()
will make the underlying ReplicaStatusRequest
to the leader for the given partition. This saves the need for every broker (because any broker could be the --bootstrap-server
) to have knowledge of the replication status of every replica, which would be inefficient in network IO and/or memory use.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Existing users of the kafka-reassign-partitions.sh
will receive a deprecation warning when they use the --zookeeper
option. The option will be removed in a future version of Kafka. If this KIP is introduced in version 1.0.0 the removal could happen in 2.0.0.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.
...