Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This proposal builds on the proposal in KIP-236, which allows individual partition reassignments to be identified.
Currently the AdminClient has no visibility of the partition reassignments occurring in a Kafka cluster. It would be valuable for this information to be available in the AdminClient because there will eventually be an API for partition reassignment in the AdminClient, and knowing about the current reassignments is important when creating more.
Public Interfaces
AdminClient.listReassignments()
AdminClient { /** * List the current reassignments affecting the given {@code partitions} * or all current reassignments if the given {@code partitions} is null. * This requires describe access to the Cluster. */ ListReassignmentsResult listReassignments(Collection<TopicPartition> partitions); ListReassignmentsResult listReassignments(Collection<TopicPartition> partitions, ListReassignmentsOptions options); }
Where:
public class ListReassignmentsOptions extends AbstractOptions<ListReassignmentsObjects> {} /** The result of {@link AdminClient#listReassignments()} */ public class ListReassignmentsResult { public KafkaFuture<Collection<Reassignment>> all(); public KafkaFuture<Reassignment> get(TopicPartition partition); } /** * Identifies the reassignment of a topic partition to some (new) set * of brokers. */ class Reassignment { // implictly this contains the czxid of the reassignment // but this is never exposed to clients /** The partition being reassigned */ public TopicPartition topicPartition() { ... } public boolean equals(Object other) { ... } // identity based on czxid public int hashCode() { ... } // identity based on czxid public String toString() { ... } }
Network Protocol: ListReassignmentsRequest
A ListReassignmentsRequest
must be sent to the controller of a cluster.
ListReassignmentsRequest => timeout_ms [topic_partition] timeout_ms => INT32 topic_partition => topic [partition_id] topic => STRING partition_id => INT32
Network Protocol: ListReassignmentsResponse
ListReassignmentsResponse => throttle_time_ms, error_code error_message [reassignment] throttle_time_ms => INT32 error_code => INT16 error_message => NULLABLE_STRING reassignment => topic [topic_reassignments] topic => STRING topic_reassignments => partition reassignment_id partition_id => INT32 reassignment_id => INT64
Where:
throttle_time_ms
is the throttle timeerror_code
is an error codeerror_message
is an error messagetopic
is a topic namepartition_id
is a partition idreassignment_id
identifies a reassignment znode
Possible errors include:
CLUSTER_AUTHORIZATION_FAILED
if the client didn't have describe on the clusterNOT_CONTROLLER
if the request was sent to a broker that was not the controllerUNKNOWN_TOPIC_OR_PARTITION
if the partition in the request does not exist
AdminClient.describeReassignments()
AdminClient { /** * Get the status of the given {@code reassignments}, or all reassignments * if the given {@code reassignments} is null. * This requires describe access to the Cluster. */ DescribeReassignmentsResult describeReassignments( Collection<Reassignment> reassignments); DescribeReassignmentsResult describeReassignments( Collection<Reassignment> reassignments, DescribeReassignmentsOptions options); }
Where Reassignment
is one returned from AdminClient.listReassignments()
and the other classes are as follows:
public class DescribeReassignmentsOptions extends AbstractOptions<DescribeReassignmentsOptions> { } /** The result of {@link AdminClient#describeReassignments()} */ public class DescribeReassignmentsResult { /** * Get (a future for) the description of the given reassignment, or * null if the given reassignment was no longer running at time * the controller processed the * {@link AdminClient#describeReassignments()} call. * * If the given {@code reassignment} was not given in the call to * {@link AdminClient#describeReassignments()} the future will throw * NoSuchElementException. */ public KafkaFuture<ReassignmentDescription> get(Reassignment reassignment); /** * The list of current reassignments. This is only useful when * {@link AdminClient#describeReassignments()} was called with a null * {@code reassignments} argument. */ public KafkaFuture<Reassignment> reassignments(); } /** Describes a reassignment */ public class ReassignmentDescription { /** * The reassignment that this description is describing. */ public Reassignment reassignment() { ... } /** * The approximate time (as an offset from the unix epoch) that the reassignment * was started. This will not change over the life of this reassignment. */ public long startTime() { ... } /** * The id of the broker that's currently leading the partition * * It is possible for this value of change over the * life of the reassignment if the leader changes. */ public int currentLeader() { ... } /** * The throttle currently applying to the leader for this partition. * * It is possible for this value of change over the * life of the reassignment if the reassignment is changed, or if the leader changes. */ public long leaderThrottle() { ... } // In the future this might also include information about the throttle(s) // for the reassignment /** * The brokers which will maintain a replica after this reassignment * is complete. The first broker in the list is the preferred leader. * When the preferred broker is in sync it will be elected leader of the partition * if the {@code auto.leader.rebalance.enable} broker config is set, or * when electPreferredLeader() is invoked. * * It is possible for this list of change over the * life of the reassignment if the reassignment is changed. */ public List<Integer> newAssignedBrokers() { ... } /** * A map from newly assigned brokers to the corresponding throttle for that broker. * The keyset of this map is precisely {@link #newAssignedBrokers()}. If * a broker is not throttled, its throttle will be {@link Long#MAX_VALUE}. * * It is possible for this map of change over the * life of the reassignment if the reassignment is changed. */ public Map<Integer, Long> newAssignedThrottles() { ... } }
Having obtained a ReassignmentDescription
a client can determine the LEO of the replicas on each of the newly assigned brokers by calling AdminClient.describeLogDirs()
. It is not possible to include this in the ReassignmentDescription
itself, this this information is not available to the controller.
The start time is provided to determine how long the reassignment has been in progress.
Network Protocol: DescribeReassignmentsRequest
A DescribeReassignmentsRequest
must be sent to the controller of a cluster.
DescribeReassignmentsRequest => [topic_reassignment] timout_ms topic_reassignment => topic reassignment topic => STRING reassignment => partition_id reassigment_id partition_id => INT32 reassignment_id => INT64 timeout_ms => INT32
Network Protocol: DescribeReassignmentsResponse
DescribeReassignmentsResponse => throttle_time_ms error_code error_message [description] throttle_time_ms => INT32 error_code => INT16 error_message => NULLABLE_STRING description => reassignment_id start_time leader_id leader_throttle [new_assigned] start_time => INT64 leader_id => INT32 leader_throttle => INT64 new_assigned => broker_id follower_throttle broker_id => INT32 follower_throttle => INT64
Where:
throttle_time_ms
is the throttle timeerror_code
is an error codeerror_message
is an error messagereassignment_id
identifies the reassignment znode, obtained from a previousListReassignmentsRequest
.start_time
is the time the reassignment startedleader_id
is the id of the broker that is currently leader of the partitionleader_throttle
is the throttle currently applying to the leaderbroker_id
is the id of a broker in the new assignmentfollower_throttle
is the throttle currently applying to the correspondingbroker_id
Possible errors include:
CLUSTER_AUTHORIZATION_FAILED
if the client didn't have describe on the clusterNOT_CONTROLLER
if the request was sent to a broker that was not the controller
Notes:
- If a
reassignment_id
provided in the DescribeReassignmentRequest does not match the current reassignment then that partition will be omitted from the DescribeReassignmentsResponse, rather than being an error.
Proposed Changes
Assuming the work proposed in KIP-236, the controller-side implementation when handling a ListPartitionsRequest
is as follows:
- The controller queries zookeeper to obtain the information about the reassignment for the given partitions from
/admin/reassginments/$topic/$partition
, obtaining the data (which contains the current new assigned partitions), and the zookeeperStat
, which includes theczxid
. We use theczxid
as thereassignment_id
.
The controller-side implementation when handling a DescribePartitionsRequest
is as follows:
- The controller queries zookeeper to obtain the information about the reassignment for the given partitions from
/admin/reassginments/$topic/$partition
, obtaining the data (which contains the current new assigned partitions), and the zookeeperStat
, which includes theczxid
andctime
.- If the znode does not exist, or the
czxid
of the znode does not match thereassignment_id
in the request the client has a reference to a now-completed reassignment and not information about this reassignment is returned. - Otherwise:
- We obtain throttle information from ZooKeeper topic and broker config
- We use the
ctime
as thestart_time.
- If the znode does not exist, or the
Compatibility, Deprecation, and Migration Plan
This is a new API, so has no impact on existing users.
Rejected Alternatives
- Another method on the AdminClient also makes sense: Getting the current `ReassignmentDescription`s for a given `Collection<TopicPartition>` (rather than for a given `Collection<Reassignment>` as proposed here) The subtly with such a method is that it obscures the distinction between two successive reassignments: When invoked successively with the same arguments it might look like the same reassignment was in progress, but in fact the results could be referring to two separate reassignments. The proposed API makes this distinction more clear, but requires obtaining a Reassignment before getting its description. Implementing this KIP does not preclude implementing such a method in the future.
- If we don't want to directly expose ZooKeeper's czxid information in the responses we could change the format of the request data stored in zookeeper to include a UUID allocated on the controller when it first creates the znode. The drawback of this is the overhead of transfering this UUID to/from ZK and the storage overhead in ZK.