Table of Contents |
---|
This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state:"Approved"
Discussion thread: here
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The DescribeQuorum
API as defined in KIP-595 is intended to allow the admin client to query the status of the Kraft quorum, including information about voter lag.
...
This KIP proposes making DescribeQuorum
API accessible via the admin client and augmenting the DescribeQuorum
API response with more information to be able to ascertain the liveness and lag of the voters in the quorum more accurately.
Public Interfaces
Additional Classes to expose the DescribeMetadataQuorumResult API to the admin client:
Code Block |
---|
public class DescribeMetadataQuorumResult { private final KafkaFuture<QuorumInfo> quorumInfo; DescribeMetadataQuorumResult(KafkaFuture<QuorumInfo> quorumInfo) { this.quorumInfo = quorumInfo; } /** * Returns a future containing the QuorumInfo */ public KafkaFuture<QuorumInfo> quorumInfo() { return quorumInfo; } } |
...
Code Block |
---|
public class QuorumInfo { private final intInteger leaderId; private final longList<ReplicaState> leaderEpochvoters; private final longList<ReplicaState> highWatermarkobservers; private final OptionalLong highWatermarkUpdateTimeMs; private final List<ReplicaState> voters; private final List<ReplicaState> observers; QuorumInfo(Integer leaderIdQuorumInfo(Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) { this.leaderId = leaderId; this.voters = voters; this.observers = observers; } public intInteger leaderId() { return leaderId; } public longList<ReplicaState> leaderEpochvoters() { return leaderEpochvoters; } public longList<ReplicaState> highWatermarkobservers() { return highWatermarkobservers; } @Override public OptionalLongboolean highWatermarkUpdateTimeMsequals(Object o) { return highWatermarkUpdateTimeMsif (this == o) return true; } public List<ReplicaState>if voters()o { == null || getClass() != o.getClass()) return votersfalse; } publicQuorumInfo that List<ReplicaState>= observers(QuorumInfo) {o; return observers;leaderId.equals(that.leaderId) } @Override public boolean equals(Object o) {&& voters.equals(that.voters) if (this == o) return true&& observers.equals(that.observers); } if@Override (o == null || getClass() != o.getClass()) return false;public int hashCode() { QuorumInfo that = (QuorumInfo) o; return Objects.hash(leaderId, voters, observers); } @Override public returnString leaderId.equals(that.leaderId)toString() { return "QuorumInfo(" + && voters.equals(that.voters) "leaderId=" + leaderId && observers.equals(that.observers); + } @Override ", publicvoters=" int+ hashCode()voters {+ return Objects.hash(leaderId", voters,observers=" + observers); + } ')'; @Override} public String toString()static class ReplicaState { return "QuorumInfo(" +private final int replicaId; private final "leaderId=" + leaderId +long logEndOffset; private final OptionalLong lastFetchTimestamp; ", voters=" + voters + private final OptionalLong lastCaughtUpTimestamp; ", observers=" + observers + ReplicaState() { ')'; this(0, 0, OptionalLong.empty(), OptionalLong.empty()); } public static class ReplicaState( { private final int replicaId;, private final long logEndOffset;, private final OptionalLong lastFetchTimeMs;lastFetchTimestamp, private final OptionalLong lastCaughtUpTimeMs;lastCaughtUpTimestamp ReplicaState() { this(0, 0, OptionalLong.empty(), OptionalLong.empty());.replicaId = replicaId; } this.logEndOffset = logEndOffset; ReplicaState( this.lastFetchTimestamp int replicaId,= lastFetchTimestamp; long logEndOffset,this.lastCaughtUpTimestamp = lastCaughtUpTimestamp; } OptionalLong lastFetchTimeMs, /** OptionalLong* lastCaughtUpTimeMs Return the ID for this replica. ) { * @return The ID for this.replicaId = replicaId; replica this.logEndOffset = logEndOffset;*/ public int this.lastFetchTimeMs = lastFetchTimeMs;replicaId() { this.lastCaughtUpTimeMs = lastCaughtUpTimeMsreturn replicaId; } /** * Return the IDlogEndOffset known by the leader for this replica. * @return The IDlogEndOffset for this replica */ public intlong replicaIdlogEndOffset() { return replicaIdlogEndOffset; } /** * Return the logEndOffsetlastFetchTime knownin bymilliseconds the leader for this replica. * @return The logEndOffset for this replica value of the lastFetchTime if known, empty otherwise */ public longOptionalLong logEndOffsetlastFetchTimestamp() { return logEndOffsetlastFetchTimestamp; } /** * Return the lastFetchTimelastCaughtUpTime in milliseconds for this replica. * @return The value of the lastFetchTimelastCaughtUpTime if known, empty otherwise */ public OptionalLong lastFetchTimeMslastCaughtUpTimestamp() { return lastFetchTimeMslastCaughtUpTimestamp; } /**@Override public *boolean Return the lastCaughtUpTime in milliseconds for this replica. equals(Object o) { if (this *== @returno) Thereturn valuetrue; of the lastCaughtUpTime if known, empty otherwise if (o == null */ || getClass() != o.getClass()) return false; public OptionalLong lastCaughtUpTimeMs() { ReplicaState that = return lastCaughtUpTimeMs(ReplicaState) o; } return replicaId @Override== that.replicaId public boolean equals(Object o) { && logEndOffset == that.logEndOffset if (this == o) return true;&& lastFetchTimestamp.equals(that.lastFetchTimestamp) if (o == null || getClass() != o.getClass()) return false&& lastCaughtUpTimestamp.equals(that.lastCaughtUpTimestamp); } ReplicaState that = (ReplicaState) o;@Override public int hashCode() { return replicaId == that.replicaId return Objects.hash(replicaId, logEndOffset, lastFetchTimestamp, lastCaughtUpTimestamp); && logEndOffset == that.logEndOffset } @Override && lastFetchTimeMs.equals(that.lastFetchTimeMs)public String toString() { return "ReplicaState(" + && lastCaughtUpTimeMs.equals(that.lastCaughtUpTimeMs); } "replicaId=" + replicaId @Override+ public int hashCode() { ", logEndOffset=" + logEndOffset + return Objects.hash(replicaId, logEndOffset, lastFetchTimeMs, lastCaughtUpTimeMs); } ", lastFetchTimestamp=" + lastFetchTimestamp + @Override public String toString() { ", lastCaughtUpTimestamp=" + lastCaughtUpTimestamp + return "ReplicaState(" + ')'; } "replicaId=" + replicaId +} } |
Code Block |
---|
public class DescribeMetadataQuorumOptions extends AbstractOptions<DescribeMetadataQuorumOptions> { } |
DescribeMetadataQuorum Handler in the Admin Client
Code Block |
---|
/** * Describes the state ", logEndOffset=" + logEndOffset +of the metadata quorum. * <p> * This is a convenience ", lastFetchTimeMs=" + lastFetchTimeMs + method for {@link #describeMetadataQuorum(DescribeMetadataQuorumOptions)} with default options. * See the overload for more details. ", lastCaughtUpTimeMs=" + lastCaughtUpTimeMs + * * @return the {@link DescribeMetadataQuorumResult} containing the result ')';*/ default DescribeMetadataQuorumResult describeMetadataQuorum() { } } } |
Code Block |
---|
public class DescribeMetadataQuorumOptions extends AbstractOptions<DescribeMetadataQuorumOptions> {
} |
DescribeMetadataQuorum Handler in the Admin Client
Code Block |
---|
return describeMetadataQuorum(new DescribeMetadataQuorumOptions()); } /** * Describes the state of the metadata quorum. * <p> * ThisThe isfollowing aexceptions conveniencecan method forbe anticipated when calling {@link@code #describeMetadataQuorumget(DescribeMetadataQuorumOptions)} with default options.on the futures obtained from * See the overloadreturned for more details.{@code DescribeMetadataQuorumResult}: * <ul> * @return the <li>{@link DescribeMetadataQuorumResult} containing the resultorg.apache.kafka.common.errors.ClusterAuthorizationException} */ If defaultthe DescribeMetadataQuorumResultauthenticated describeMetadataQuorum() { user didn't have {@code DESCRIBE} access to the cluster.</li> return describeMetadataQuorum(new DescribeMetadataQuorumOptions()); * <li>{@link org.apache.kafka.common.errors.TimeoutException} /** If the * Describesrequest timed out before the statecontroller could oflist the metadatacluster quorumlinks.</li> * <p></ul> * The following exceptions can be* anticipated@param whenoptions callingThe {@code@link get()DescribeMetadataQuorumOptions} onto use thewhen futuresdescribing obtainedthe fromquorum. * @return the returned {@code@link DescribeMetadataQuorumResult}: containing the * <ul>result */ <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException} * If the authenticated user didn't have {@code DESCRIBE} access to the cluster.</li> * <li>{@link org.apache.kafka.common.errors.TimeoutException} * If the request timed out before the controller could list the cluster links.</li> * </ul> * * @param options The {@link DescribeMetadataQuorumOptions} to use when describing the quorum. * @return the {@link DescribeMetadataQuorumResult} containing the result */ DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options); |
Proposed change in the DescribeQuorum
Response:
DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options); |
Proposed change in the DescribeQuorum
Response:
Code Block |
---|
"apiKey": 55,
"type": "response",
|
Code Block |
"apiKey": 55, "type": "response", "name": "DescribeQuorumResponse", "validVersions": "0-1", "flexibleVersions": "0+", "fields": [ { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top level error code."}, { "name": "TopicsDescribeQuorumResponse", "typevalidVersions": "[]TopicData0-1", "versionsflexibleVersions": "0+", "fields": [ { "name": "TopicNameErrorCode", "type": "string", "versions": "0+int16", "entityTypeversions": "topicName0+", "about": "The topic nametop level error code." }, { "name": "PartitionsTopics", "type": "[]PartitionDataTopicData", "versions": "0+", "fields": [ { "name": "PartitionIndexTopicName", "type": "int32string", "versions": "0+", "entityType": "topicName", "about": "The partitiontopic indexname." }, { {"name": "Partitions", "nametype": "ErrorCode[]PartitionData", "typeversions": "int160+", "versionsfields": "0+"},[ { "name": "LeaderIdPartitionIndex", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The ID of the current leader or -1 if the leader is unknown. partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+"}, { "name": "LeaderEpochLeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The latest known ID of the current leader or -1 if the leader epochis unknown."}, { "name": "HighWatermarkLeaderEpoch", "type": "int64int32", "versions": "0+"}, { "nameabout": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" The latest known leader epoch"}, { "name": "ObserversHighWatermark", "type": "[]ReplicaStateint64", "versions": "0+" }, { "name": "HighWatermarkUpdateTimeMsCurrentVoters", "type": "int64[]ReplicaState", "defaultversions": -1, "tag": 0"0+" }, { "taggedVersions": "1+", "ignorable": true, "about": "The wall clock time that the high watermark was last updated on the leadername": "Observers", "type": "[]ReplicaState", "versions": "0+" } ]} ]}], "commonStructs": [ { "name": "ReplicaState", "versions": "0+", "fields": [ { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" }, { "name": "LogEndOffset", "type": "int64", "versions": "0+", "about": "The last known log end offset of the follower or -1 if it is unknown"}, { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1, "about": "The last known leader wall clock time time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter"}, { "name": "LastCaughtUpTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1, "about": "The leader wall clock append time of the offset for which the follower made the most recent fetch request. This is reported as the current time for the leader and -1 if unknown for a voter"} ]} ] |
kafka-metadata-quorum.sh
The output from this API will also be captured in the kafka-metadata-quorum.sh
. The output of the --describe replication
command in the tool as defined in KIP-595 will change as follows:
Code Block |
---|
> bin/kafka-metadata-quorum.sh --describe replication ReplicaId LogEndOffset Lag LastFetchTimeMsLastFetchTimestamp LastCaughtUpTimeMsLastCaughtUpTimestamp Status 0 234134 0 tnow tnow Leader 1 234130 4 t2 t6 Follower 2 234100 34 t3 t7 Follower 3 234124 10 t4 t8 Observer 4 234130 4 t5 t9 Observer |
Proposed Changes
This KIP proposes exposing the DescribeQuorum
API to the admin client and adding two new fields per replica (including voters and observers) to the DescribeQuorum
API response.
...
LastFetchTimestamp
This metric will be reported for each voter. This is a good approximation of the “liveness” of the voters and can be used to detect a network partition in the quorum.
This information is already known to the leader for all voters and only needs to be added to the responseLastCaughtUpTimestamp
This metric will be reported for each voter. This is akin to the metric used to track lag for replicas in ISR and it measures the approximate lag between the leader and the replica based on the offsets requested in the fetch requests and when they were made. The metric gives a measure of when was the last time a replica caught up to the leader. .
To compute this metric, the Replica state maintains a few bits of information about fetch requests as they are received. Some of this information is not tracked by the Raft layer but it can be easily added in. The cost to track this information is minimal and it only requires some additional bookkeeping during pre-existing processing for a fetch.
NOTE: Given the leader is always caught up to itself, the Last Caught Up Time for the leader will be the leader's wall clock time when it responded to theDescribeQuorum
request.
...
Compatibility, Deprecation, and Migration Plan
The API response is versioned and the newly added fields will bump the message version on the response. This should handle any issues around compatibility.
Rejected Alternatives
Use Existing Fields
An alternative here was to not add in this information and use existing information available in the response to compute voter lag. As discussed in the motivation section, the only information available in the response at the moment is the offset, which does not allow for a very reliable mechanism to ascertain voter lag.
Track Some Other Information
Another possibility here is to add information which allows us to tell the voter lag more accurately. An argument can be made against the accuracy of the lag as measured by the additional fields proposed in this KIP. A more complete view of the voters would be to add in Replica Time/Offset Information
...