Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add HighWatermarkUpdateTimeMs field

...

Code Block
public class QuorumInfo {
    private final Integerint leaderId;
    private final long leaderEpoch;
    private final long highWatermark;
    private final OptionalLong highWatermarkUpdateTimeMs;
    private final List<ReplicaState> voters;
    private final List<ReplicaState> observers;

    QuorumInfo(Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
        this.leaderId = leaderId;
        this.voters = voters;
        this.observers = observers;
    }

    public Integerint leaderId() {
        return leaderId;
    }

    public List<ReplicaState>long votersleaderEpoch() {
        return votersleaderEpoch;
    }

    public List<ReplicaState>long observershighWatermark() {
        return observershighWatermark;
    }

    @Override
    public booleanOptionalLong equalshighWatermarkUpdateTimeMs(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return falsereturn highWatermarkUpdateTimeMs;
    }

    public List<ReplicaState> voters() {
        return voters;
    }

    QuorumInfopublic thatList<ReplicaState> = observers(QuorumInfo) o;{
        return leaderId.equals(that.leaderId)observers;
     }

    @Override
   && voters.equals(that.voters) public boolean equals(Object o) {
        if (this == o) && observers.equals(that.observers)return true;
    }

    if @Override
(o == null || public int hashCode() {getClass() != o.getClass()) return false;
        return Objects.hash(leaderId, voters, observers);
QuorumInfo that = (QuorumInfo) o;
     }

    @Overridereturn leaderId.equals(that.leaderId)
    public String toString() {
     && voters.equals(that.voters)
  return "QuorumInfo(" +
        && observers.equals(that.observers);
   "leaderId=" + leaderId + }

    @Override
    public int hashCode() {
       ", voters=" +return Objects.hash(leaderId, voters, +observers);
    }

    @Override
    ", observers=" + observers +public String toString() {
        return "QuorumInfo(" +
  ')';
    }

    public static class ReplicaState {  "leaderId=" + leaderId +
        private final int replicaId;
 ", voters=" + voters +
   private final long logEndOffset;
      ", observers=" private+ finalobservers OptionalLong+
 lastFetchTimeMs;
        private final OptionalLong lastCaughtUpTimeMs')';
    }

    public static  class ReplicaState() {
        private final   this(0, 0, OptionalLong.empty(), OptionalLong.empty())int replicaId;
        }

private final long logEndOffset;
     ReplicaState(
   private final OptionalLong lastFetchTimeMs;
      int replicaId,
 private final OptionalLong lastCaughtUpTimeMs;

        long logEndOffset,ReplicaState() {
            OptionalLong lastFetchTimeMs,
     this(0, 0, OptionalLong.empty(), OptionalLong.empty());
       OptionalLong lastCaughtUpTimeMs}

        ) {ReplicaState(
            this.replicaId =int replicaId;,
            this.logEndOffset =long logEndOffset;,
            this.lastFetchTimeMs =OptionalLong lastFetchTimeMs;,
            this.lastCaughtUpTimeMsOptionalLong = lastCaughtUpTimeMs;
        }) {

        /**
    this.replicaId = replicaId;
   * Return the ID for this replica.
   this.logEndOffset      * @return The ID for this replica
= logEndOffset;
            this.lastFetchTimeMs =  */lastFetchTimeMs;
        public int replicaId() {
 this.lastCaughtUpTimeMs           return replicaId= lastCaughtUpTimeMs;
        }

        /**
         * Return the logEndOffset known by the leader ID for this replica.
         * @return The logEndOffsetID for this replica
         */
        public longint logEndOffsetreplicaId() {
            return logEndOffsetreplicaId;
        }

        /**
         * Return the logEndOffset known lastFetchTimeby inthe millisecondsleader for this replica.
         * @return The valuelogEndOffset offor the lastFetchTime if known, empty otherwisethis replica
         */
        public OptionalLonglong lastFetchTimeMslogEndOffset() {
            return lastFetchTimeMslogEndOffset;
        }

        /**
         * Return the lastCaughtUpTimelastFetchTime in milliseconds for this replica.
         * @return The value of the lastCaughtUpTimelastFetchTime if known, empty otherwise
         */
        public OptionalLong lastCaughtUpTimeMslastFetchTimeMs() {
            return lastCaughtUpTimeMslastFetchTimeMs;
        }

        @Override/**
        public boolean* equals(Object o) {
     Return the lastCaughtUpTime in milliseconds for this replica.
       if (this ==* o)@return returnThe true;
value of the lastCaughtUpTime if known, empty otherwise
     if (o == null || getClass() != o.getClass()) return false;
*/
        public OptionalLong lastCaughtUpTimeMs() {
      ReplicaState that = (ReplicaState) o;
  return lastCaughtUpTimeMs;
        }

 return replicaId == that.replicaId
    @Override
        public boolean equals(Object  && logEndOffset == that.logEndOffseto) {
            if (this == o) && lastFetchTimeMs.equals(that.lastFetchTimeMs)return true;
            if (o == null && lastCaughtUpTimeMs.equals(that.lastCaughtUpTimeMs);
        }

  || getClass() != o.getClass()) return false;
      @Override
      ReplicaState that public int= hashCode(ReplicaState) {o;
            return Objects.hash(replicaId, logEndOffset, lastFetchTimeMs, lastCaughtUpTimeMs);== that.replicaId
        }

        @Override
&& logEndOffset == that.logEndOffset
     public String toString() {
        && lastFetchTimeMs.equals(that.lastFetchTimeMs)
    return "ReplicaState(" +
          && lastCaughtUpTimeMs.equals(that.lastCaughtUpTimeMs);
     "replicaId=" + replicaId +}

        @Override
        ", logEndOffset=" + logEndOffset +public int hashCode() {
            return    "Objects.hash(replicaId, logEndOffset, lastFetchTimeMs=" + lastFetchTimeMs +, lastCaughtUpTimeMs);
        }

        ", lastCaughtUpTimeMs=" + lastCaughtUpTimeMs +
@Override
        public String toString() {
         ')';
   return "ReplicaState(" +
   }
    }
}
Code Block
public class DescribeMetadataQuorumOptions extends AbstractOptions<DescribeMetadataQuorumOptions> {

}

DescribeMetadataQuorum Handler in the Admin Client

Code Block
  /**
  "replicaId=" + replicaId *+
 Describes the state of the metadata quorum.
     * <p>
   ", logEndOffset=" *+ ThislogEndOffset is+
 a convenience method for {@link #describeMetadataQuorum(DescribeMetadataQuorumOptions)} with default options.
     * See the overload for more details.
", lastFetchTimeMs=" + lastFetchTimeMs +
      *
     * @return the {@link DescribeMetadataQuorumResult} containing the result
   ", lastCaughtUpTimeMs=" + lastCaughtUpTimeMs +
       */
    default DescribeMetadataQuorumResult describeMetadataQuorum() {
  ')';
      return describeMetadataQuorum(new DescribeMetadataQuorumOptions()); }
    }
}


Code Block
public class DescribeMetadataQuorumOptions extends AbstractOptions<DescribeMetadataQuorumOptions> {

}

DescribeMetadataQuorum Handler in the Admin Client

Code Block

    /**
     * Describes the state of the metadata quorum.
     * <p>
     * TheThis followingis exceptionsa canconvenience bemethod anticipatedfor when calling {@code@link get#describeMetadataQuorum(DescribeMetadataQuorumOptions)} onwith the futures obtained fromdefault options.
     * See the overload returnedfor {@code DescribeMetadataQuorumResult}:more details.
     * <ul>
     * @return the <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}DescribeMetadataQuorumResult} containing the result
     */
   If thedefault authenticatedDescribeMetadataQuorumResult user didn't have {@code DESCRIBE} access to the cluster.</li>
describeMetadataQuorum() {
        return  *describeMetadataQuorum(new DescribeMetadataQuorumOptions());
   <li>{@link org.apache.kafka.common.errors.TimeoutException}

    /**
 *   If the* requestDescribes timedthe outstate beforeof the controller could list the cluster links.</li>metadata quorum.
     * </ul><p>
     *
 The following exceptions can *be @paramanticipated optionswhen Thecalling {@link@code DescribeMetadataQuorumOptionsget()} toon usethe whenfutures describing the quorum.obtained from
     * @returnthe thereturned {@link@code DescribeMetadataQuorumResult}:
 containing    the* result<ul>
     */
    DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options);

Proposed change in the DescribeQuorum Response:

<li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
     *   If the authenticated user didn't have {@code DESCRIBE} access to the cluster.</li>
     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *   If the request timed out before the controller could list the cluster links.</li>
     * </ul>
     *
     * @param options The {@link DescribeMetadataQuorumOptions} to use when describing the quorum.
     * @return the {@link DescribeMetadataQuorumResult} containing the result
     */
    DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options);

Proposed change in the DescribeQuorum Response:

Code Block
  "apiKey": 55,
  "type": "response",
  "name": "DescribeQuorumResponse",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { 
Code Block
  "apiKey": 55,
  "type": "response",
  "name": "DescribeQuorumResponseTopics",
  "validVersionstype": "0-1[]TopicData",
      "flexibleVersionsversions": "0+",
  "fields": [
      { "name": "ErrorCodeTopicName", "type": "int16string", "versions": "0+", "versionsentityType": "0+topicName",
        "about": "The top level error codetopic name." },
      { "name": "TopicsPartitions", "type": "[]TopicDataPartitionData",
        "versions": "0+", "fields": [
        { "name": "TopicNamePartitionIndex", "type": "stringint32", "versions": "0+",
 "entityType": "topicName",
        "about": "The topicpartition nameindex." },
        { "name": "PartitionsErrorCode", "type": "[]PartitionDataint16",
        "versions": "0+", "fields": ["0+"},
        { "name": "PartitionIndexLeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The partitionID index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderIdLeaderEpoch", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown. latest known leader epoch"},
        { "name": "HighWatermark", "type": "int64", "versions": "0+"},
        { "name": "LeaderEpochCurrentVoters", "type": "int32[]ReplicaState", "versions": "0+" },
         { "aboutname": "The latest known leader epoch"Observers", "type": "[]ReplicaState", "versions": "0+" },
        { "name": "HighWatermarkHighWatermarkUpdateTimeMs", "type": "int64", "versionsdefault": -1, "0+"}tag": 0,
         { "nametaggedVersions": "CurrentVoters1+", "typeignorable": "[]ReplicaState"true, "versionsabout": "0+" },
        { "name": "Observers", "type": "[]ReplicaState", "versions": "0+"The wall clock time that the high watermark was last updated on the leader" }
         ]}
    ]}],
  "commonStructs": [
      { "name": "ReplicaState", "versions": "0+", "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
      { "name": "LogEndOffset", "type": "int64", "versions": "0+",
        "about": "The last known log end offset of the follower or -1 if it is unknown"},
      { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
        "about": "The last known leader wall clock time time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter"},
      { "name": "LastCaughtUpTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
        "about": "The leader wall clock append time of the offset for which the follower made the most recent fetch request. This is reported as the current time for the leader and -1 if unknown for a voter"}
    ]}
  ]

...

  1. LastFetchTimestamp
    This metric will be reported for each voter. This is a good approximation of the “liveness” of the voters and can be used to detect a network partition in the quorum.
    This information is already known to the leader for all voters and only needs to be added to the response

  2. LastCaughtUpTimestamp
    This metric will be reported for each voter. This is akin to the metric used to track lag for replicas in ISR and it measures the approximate lag between the leader and the replica based on the offsets requested in the fetch requests and when they were made. The metric gives a measure of when was the last time a replica caught up to the leader. .
    To compute this metric, the Replica state maintains a few bits of information about fetch requests as they are received. Some of this information is not tracked by the Raft layer but it can be easily added in. The cost to track this information is minimal and it only requires some additional bookkeeping during pre-existing processing for a fetch.

    NOTE: Given the leader is always caught up to itself, the Last Caught Up Time for the leader will be the leader's wall clock time when it responded to the DescribeQuorum request.


Note: A late addition to this KIP is the HighWatermarkUpdateTimeMs  field, which is useful for operators to more easily see replication progress. 

Compatibility, Deprecation, and Migration Plan

...