Table of Contents |
---|
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: [One of " Under Discussion", "Accepted", "Rejected"]
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Describe the problems you are trying to solve.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Binary log format
The network protocol and api behavior
Any class in the public packages under clientsConfiguration, especially client configuration
org/apache/kafka/common/serialization
org/apache/kafka/common
org/apache/kafka/common/errors
org/apache/kafka/clients/producer
org/apache/kafka/clients/consumer (eventually, once stable)
Monitoring
Command line tools and arguments
- Anything else that will likely break existing users in some way when they upgrade
Proposed Changes
...
This proposal builds on the proposal in KIP-236, which allows individual partition reassignments to be identified.
Currently the AdminClient has no visibility of the partition reassignments occurring in a Kafka cluster. It would be valuable for this information to be available in the AdminClient because there will eventually be an API for partition reassignment in the AdminClient, and knowing about the current reassignments is important when creating more.
Public Interfaces
AdminClient.listReassignments()
Code Block | ||||
---|---|---|---|---|
| ||||
AdminClient {
/**
* List the current reassignments affecting the given {@code partitions}
* or all current reassignments if the given {@code partitions} is null.
* This requires describe access to the Cluster.
*/
ListReassignmentsResult listReassignments(Collection<TopicPartition> partitions);
ListReassignmentsResult listReassignments(Collection<TopicPartition> partitions,
ListReassignmentsOptions options);
} |
Where:
Code Block | ||||
---|---|---|---|---|
| ||||
public class ListReassignmentsOptions extends AbstractOptions<ListReassignmentsObjects> {}
/** The result of {@link AdminClient#listReassignments()} */
public class ListReassignmentsResult {
public KafkaFuture<Collection<Reassignment>> all();
public KafkaFuture<Reassignment> get(TopicPartition partition);
}
/**
* Identifies the reassignment of a topic partition to some (new) set
* of brokers.
*/
class Reassignment {
// implictly this contains the czxid of the reassignment
// but this is never exposed to clients
/** The partition being reassigned */
public TopicPartition topicPartition() { ... }
public boolean equals(Object other) { ... } // identity based on czxid
public int hashCode() { ... } // identity based on czxid
public String toString() { ... }
} |
Network Protocol: ListReassignmentsRequest
A ListReassignmentsRequest
must be sent to the controller of a cluster.
No Format |
---|
ListReassignmentsRequest => timeout_ms [topic_partition]
timeout_ms => INT32
topic_partition => topic [partition_id]
topic => STRING
partition_id => INT32 |
Network Protocol: ListReassignmentsResponse
No Format |
---|
ListReassignmentsResponse => throttle_time_ms, error_code error_message [reassignment]
throttle_time_ms => INT32
error_code => INT16
error_message => NULLABLE_STRING
reassignment => topic [topic_reassignments]
topic => STRING
topic_reassignments => partition reassignment_id
partition_id => INT32
reassignment_id => INT64 |
Where:
throttle_time_ms
is the throttle timeerror_code
is an error codeerror_message
is an error messagetopic
is a topic namepartition_id
is a partition idreassignment_id
identifies a reassignment znode
Possible errors include:
CLUSTER_AUTHORIZATION_FAILED
if the client didn't have describe on the clusterNOT_CONTROLLER
if the request was sent to a broker that was not the controllerUNKNOWN_TOPIC_OR_PARTITION
if the partition in the request does not exist
AdminClient.describeReassignments()
Code Block | ||||
---|---|---|---|---|
| ||||
AdminClient {
/**
* Get the status of the given {@code reassignments}, or all reassignments
* if the given {@code reassignments} is null.
* This requires describe access to the Cluster.
*/
DescribeReassignmentsResult describeReassignments(
Collection<Reassignment> reassignments);
DescribeReassignmentsResult describeReassignments(
Collection<Reassignment> reassignments, DescribeReassignmentsOptions options);
} |
Where Reassignment
is one returned from AdminClient.listReassignments()
and the other classes are as follows:
Code Block | ||||
---|---|---|---|---|
| ||||
public class DescribeReassignmentsOptions extends AbstractOptions<DescribeReassignmentsOptions> { }
/** The result of {@link AdminClient#describeReassignments()} */
public class DescribeReassignmentsResult {
/**
* Get (a future for) the description of the given reassignment, or
* null if the given reassignment was no longer running at time
* the controller processed the
* {@link AdminClient#describeReassignments()} call.
*
* If the given {@code reassignment} was not given in the call to
* {@link AdminClient#describeReassignments()} the future will throw
* NoSuchElementException.
*/
public KafkaFuture<ReassignmentDescription> get(Reassignment reassignment);
/**
* The list of current reassignments. This is only useful when
* {@link AdminClient#describeReassignments()} was called with a null
* {@code reassignments} argument.
*/
public KafkaFuture<Reassignment> reassignments();
}
/** Describes a reassignment */
public class ReassignmentDescription {
/**
* The reassignment that this description is describing.
*/
public Reassignment reassignment() { ... }
/**
* The approximate time (as an offset from the unix epoch) that the reassignment
* was started. This will not change over the life of this reassignment.
*/
public long startTime() { ... }
/**
* The id of the broker that's currently leading the partition
*
* It is possible for this value of change over the
* life of the reassignment if the leader changes.
*/
public int currentLeader() { ... }
/**
* The throttle currently applying to the leader for this partition.
*
* It is possible for this value of change over the
* life of the reassignment if the reassignment is changed, or if the leader changes.
*/
public long leaderThrottle() { ... }
// In the future this might also include information about the throttle(s)
// for the reassignment
/**
* The brokers which will maintain a replica after this reassignment
* is complete. The first broker in the list is the preferred leader.
* When the preferred broker is in sync it will be elected leader of the partition
* if the {@code auto.leader.rebalance.enable} broker config is set, or
* when electPreferredLeader() is invoked.
*
* It is possible for this list of change over the
* life of the reassignment if the reassignment is changed.
*/
public List<Integer> newAssignedBrokers() { ... }
/**
* A map from newly assigned brokers to the corresponding throttle for that broker.
* The keyset of this map is precisely {@link #newAssignedBrokers()}. If
* a broker is not throttled, its throttle will be {@link Long#MAX_VALUE}.
*
* It is possible for this map of change over the
* life of the reassignment if the reassignment is changed.
*/
public Map<Integer, Long> newAssignedThrottles() { ... }
} |
Having obtained a ReassignmentDescription
a client can determine the LEO of the replicas on each of the newly assigned brokers by calling AdminClient.describeLogDirs()
. It is not possible to include this in the ReassignmentDescription
itself, this this information is not available to the controller.
The start time is provided to determine how long the reassignment has been in progress.
Network Protocol: DescribeReassignmentsRequest
A DescribeReassignmentsRequest
must be sent to the controller of a cluster.
No Format |
---|
DescribeReassignmentsRequest => [topic_reassignment] timout_ms
topic_reassignment => topic reassignment
topic => STRING
reassignment => partition_id reassigment_id
partition_id => INT32
reassignment_id => INT64
timeout_ms => INT32 |
Network Protocol: DescribeReassignmentsResponse
No Format |
---|
DescribeReassignmentsResponse => throttle_time_ms error_code error_message [description]
throttle_time_ms => INT32
error_code => INT16
error_message => NULLABLE_STRING
description => reassignment_id start_time leader_id leader_throttle [new_assigned]
start_time => INT64
leader_id => INT32
leader_throttle => INT64
new_assigned => broker_id follower_throttle
broker_id => INT32
follower_throttle => INT64 |
Where:
throttle_time_ms
is the throttle timeerror_code
is an error codeerror_message
is an error messagereassignment_id
identifies the reassignment znode, obtained from a previousListReassignmentsRequest
.start_time
is the time the reassignment startedleader_id
is the id of the broker that is currently leader of the partitionleader_throttle
is the throttle currently applying to the leaderbroker_id
is the id of a broker in the new assignmentfollower_throttle
is the throttle currently applying to the correspondingbroker_id
Possible errors include:
CLUSTER_AUTHORIZATION_FAILED
if the client didn't have describe on the clusterNOT_CONTROLLER
if the request was sent to a broker that was not the controller
Notes:
- If a
reassignment_id
provided in the DescribeReassignmentRequest does not match the current reassignment then that partition will be omitted from the DescribeReassignmentsResponse, rather than being an error.
Proposed Changes
Assuming the work proposed in KIP-236, the controller-side implementation when handling a ListPartitionsRequest
is as follows:
- The controller queries zookeeper to obtain the information about the reassignment for the given partitions from
/admin/reassginments/$topic/$partition
, obtaining the data (which contains the current new assigned partitions), and the zookeeperStat
, which includes theczxid
. We use theczxid
as thereassignment_id
.
The controller-side implementation when handling a DescribePartitionsRequest
is as follows:
- The controller queries zookeeper to obtain the information about the reassignment for the given partitions from
/admin/reassginments/$topic/$partition
, obtaining the data (which contains the current new assigned partitions), and the zookeeperStat
, which includes theczxid
andctime
.- If the znode does not exist, or the
czxid
of the znode does not match thereassignment_id
in the request the client has a reference to a now-completed reassignment and not information about this reassignment is returned. - Otherwise:
- We obtain throttle information from ZooKeeper topic and broker config
- We use the
ctime
as thestart_time.
- If the znode does not exist, or the
Compatibility, Deprecation, and Migration Plan
...
This is a new API, so has no impact on existing users
...
.
Rejected Alternatives
- Another method on the AdminClient also makes sense: Getting the current `ReassignmentDescription`s for a given `Collection<TopicPartition>` (rather than for a given `Collection<Reassignment>` as proposed here) The subtly with such a method is that it obscures the distinction between two successive reassignments: When invoked successively with the same arguments it might look like the same reassignment was in progress, but in fact the results could be referring to two separate reassignments. The proposed API makes this distinction more clear, but requires obtaining a Reassignment before getting its description. Implementing this KIP does not preclude implementing such a method in the future.
- If we don't want to directly expose ZooKeeper's czxid information in the responses we could change the format of the request data stored in zookeeper to include a UUID allocated on the controller when it first creates the znode. The drawback of this is the overhead of transfering this UUID to/from ZK and the storage overhead in ZK.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.