Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Reverted from v. 15

...

Code Block
public class QuorumInfo {
    private final intInteger leaderId;
    private final long leaderEpoch;
    private final long highWatermark;
    private final OptionalLong highWatermarkUpdateTimeMs;
    private final List<ReplicaState> voters;
    private final List<ReplicaState> observers;

    QuorumInfo(Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
        this.leaderId = leaderId;
        this.voters = voters;
        this.observers = observers;
    }

    public intInteger leaderId() {
        return leaderId;
    }

    public longList<ReplicaState> leaderEpochvoters() {
        return leaderEpochvoters;
    }

    public longList<ReplicaState> highWatermarkobservers() {
        return highWatermarkobservers;
    }

    @Override
    public OptionalLongboolean highWatermarkUpdateTimeMsequals(Object o) {
        return highWatermarkUpdateTimeMsif (this == o) return true;
    }

     public List<ReplicaState>if voters()o {
== null || getClass() !=    o.getClass()) return votersfalse;
     }

   QuorumInfo publicthat List<ReplicaState>= observers(QuorumInfo) {o;
        return observers;
leaderId.equals(that.leaderId)
     }

    @Override
    public boolean equals(Object o) {&& voters.equals(that.voters)
        if (this == o) return true&& observers.equals(that.observers);
    }

    if@Override
 (o == null || getClass() != o.getClass()) return false;public int hashCode() {
        QuorumInfo that = (QuorumInfo) oreturn Objects.hash(leaderId, voters, observers);
    }

    return leaderId.equals(that.leaderId)@Override
    public String toString() {
     && voters.equals(that.voters)
  return "QuorumInfo(" +
        && observers.equals(that.observers);
    }

    @Override"leaderId=" + leaderId +
    public int hashCode() {
     ",   return Objects.hash(leaderId,voters=" + voters, observers);+
    }

     @Override
   ", publicobservers=" String+ toString()observers {+
        return "QuorumInfo(" +
  ')';
    }

    public static "leaderId=" + leaderId +class ReplicaState {
        private final int replicaId;
  ", voters=" + voters +
  private final long logEndOffset;
       ", observers="private +final observersOptionalLong +lastFetchTimeMs;
        private final OptionalLong  ')'lastCaughtUpTimeMs;

    }

    public static class ReplicaState() {
        private final int replicaId  this(0, 0, OptionalLong.empty(), OptionalLong.empty());
        private}

 final long logEndOffset;
     ReplicaState(
   private final OptionalLong lastFetchTimeMs;
      int replicaId,
 private final OptionalLong lastCaughtUpTimeMs;

        ReplicaState() {long logEndOffset,
            this(0, 0, OptionalLong.empty(), OptionalLong.empty());
OptionalLong lastFetchTimeMs,
            OptionalLong }lastCaughtUpTimeMs

        ) ReplicaState({
            intthis.replicaId = replicaId,;
            longthis.logEndOffset = logEndOffset,;
            OptionalLongthis.lastFetchTimeMs = lastFetchTimeMs,;
            OptionalLongthis.lastCaughtUpTimeMs = lastCaughtUpTimeMs;
        ) {}

        /**
    this.replicaId = replicaId;
   * Return the ID for this replica.
         * @return The ID for this.logEndOffset = logEndOffset;
 replica
         */
      this.lastFetchTimeMs = lastFetchTimeMs;  public int replicaId() {
            this.lastCaughtUpTimeMs = lastCaughtUpTimeMsreturn replicaId;
        }

        /**
         * Return the IDlogEndOffset known by the leader for this replica.
         * @return The IDlogEndOffset for this replica
         */
        public intlong replicaIdlogEndOffset() {
            return replicaIdlogEndOffset;
        }

        /**
         * Return the logEndOffsetlastFetchTime knownin bymilliseconds the leader for this replica.
         * @return The logEndOffset for this replica value of the lastFetchTime if known, empty otherwise
         */
        public longOptionalLong logEndOffsetlastFetchTimeMs() {
            return logEndOffsetlastFetchTimeMs;
        }

        /**
         * Return the lastFetchTimelastCaughtUpTime in milliseconds for this replica.
         * @return The value of the lastFetchTimelastCaughtUpTime if known, empty otherwise
         */
        public OptionalLong lastFetchTimeMslastCaughtUpTimeMs() {
            return lastFetchTimeMslastCaughtUpTimeMs;
        }

        /**@Override
        public *boolean Return the lastCaughtUpTime in milliseconds for this replica.
equals(Object o) {
            if  * @return The value of the lastCaughtUpTime if known, empty otherwise
(this == o) return true;
            if (o == null  */
    || getClass() != o.getClass()) return false;
    public OptionalLong lastCaughtUpTimeMs() {
     ReplicaState that = (ReplicaState) o;
   return lastCaughtUpTimeMs;
        }

return replicaId == that.replicaId
        @Override
        public&& boolean equals(Object o) {logEndOffset == that.logEndOffset
            if (this == o) return true;&& lastFetchTimeMs.equals(that.lastFetchTimeMs)
            if (o == null || getClass() != o.getClass()) return false;
&& lastCaughtUpTimeMs.equals(that.lastCaughtUpTimeMs);
        }

        @Override
      ReplicaState  thatpublic =int hashCode(ReplicaState) o;{
            return Objects.hash(replicaId == that.replicaId, logEndOffset, lastFetchTimeMs, lastCaughtUpTimeMs);
        }

        &&@Override
 logEndOffset == that.logEndOffset
     public String toString() {
        && lastFetchTimeMs.equals(that.lastFetchTimeMs)
   return "ReplicaState(" +
           && lastCaughtUpTimeMs.equals(that.lastCaughtUpTimeMs);
    "replicaId=" + replicaId +
 }

        @Override
       ", publiclogEndOffset=" int+ hashCode()logEndOffset {+
            return Objects.hash(replicaId, logEndOffset    ", lastFetchTimeMs, lastCaughtUpTimeMs);=" + lastFetchTimeMs +
        }

        @Override
    ", lastCaughtUpTimeMs=" + lastCaughtUpTimeMs +
    public String toString() {
         ')';
   return "ReplicaState(" +
   }
    }
}


Code Block
public class DescribeMetadataQuorumOptions extends AbstractOptions<DescribeMetadataQuorumOptions> {

}

DescribeMetadataQuorum Handler in the Admin Client

Code Block
  /**
  "replicaId=" + replicaId +
* Describes the state of the metadata quorum.
     * <p>
   ", logEndOffset=" +* logEndOffsetThis +
is a convenience method for {@link #describeMetadataQuorum(DescribeMetadataQuorumOptions)} with default options.
     * See ", lastFetchTimeMs=" + lastFetchTimeMs +the overload for more details.
     *
     * @return the {@link DescribeMetadataQuorumResult}  ", lastCaughtUpTimeMs=" + lastCaughtUpTimeMs +containing the result
     */
    default DescribeMetadataQuorumResult describeMetadataQuorum() {
        return  ')'describeMetadataQuorum(new DescribeMetadataQuorumOptions());
        }
    }
}
Code Block
public class DescribeMetadataQuorumOptions extends AbstractOptions<DescribeMetadataQuorumOptions> {

}

DescribeMetadataQuorum Handler in the Admin Client

Code Block
  /**
     * Describes the state of the metadata quorum.
     * <p>
     * This is a convenience method for {@link #describeMetadataQuorum(DescribeMetadataQuorumOptions)} with default options.The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
     * See the overloadreturned for more details.{@code DescribeMetadataQuorumResult}:
     * <ul>
     * @return the <li>{@link DescribeMetadataQuorumResult} containing the resultorg.apache.kafka.common.errors.ClusterAuthorizationException}
     */
   If default DescribeMetadataQuorumResult describeMetadataQuorum() {
        return describeMetadataQuorum(new DescribeMetadataQuorumOptions());
    }

    /**
     * Describes the state of the metadata quorum.the authenticated user didn't have {@code DESCRIBE} access to the cluster.</li>
     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *   If the request timed out before the controller could list the cluster links.</li>
     * <p></ul>
     *
 The following exceptions can be* anticipated@param whenoptions callingThe {@code@link get()DescribeMetadataQuorumOptions} onto use thewhen futuresdescribing obtainedthe fromquorum.
     * @return the returned {@code@link DescribeMetadataQuorumResult}: containing the result
     * <ul>/
    DescribeMetadataQuorumResult *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
     *   If the authenticated user didn't have {@code DESCRIBE} access to the cluster.</li>
     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *   If the request timed out before the controller could list the cluster links.</li>
     * </ul>
     *
     * @param options The {@link DescribeMetadataQuorumOptions} to use when describing the quorum.
     * @return the {@link DescribeMetadataQuorumResult} containing the result
     */
    DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options);

Proposed change in the DescribeQuorum Response:

describeMetadataQuorum(DescribeMetadataQuorumOptions options);

Proposed change in the DescribeQuorum Response:

Code Block
  "apiKey": 55,
  "type": "response",
  
Code Block
  "apiKey": 55,
  "type": "response",
  "name": "DescribeQuorumResponse",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "TopicsDescribeQuorumResponse",
  "typevalidVersions": "[]TopicData0-1",
      "versionsflexibleVersions": "0+",
  "fields": [
      { "name": "TopicNameErrorCode", "type": "string", "versions": "0+"int16", "entityTypeversions": "topicName0+",
        "about": "The top topiclevel error namecode." },
      { "name": "PartitionsTopics", "type": "[]PartitionDataTopicData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndexTopicName", "type": "int32string", "versions": "0+",
 "entityType": "topicName",
        "about": "The partitiontopic indexname." },
      {  {"name": "Partitions", "nametype": "ErrorCode",[]PartitionData",
        "typeversions": "int160+", "versionsfields": "0+"},[
        { "name": "LeaderIdPartitionIndex", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown. partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderEpochLeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The latest known ID of the current leader or -1 if the leader epochis unknown."},
        { "name": "HighWatermarkLeaderEpoch", "type": "int64int32", "versions": "0+"},
        { "name": "CurrentVoters", "type": "[]ReplicaState", "versions "about": "0+" The latest known leader epoch"},
        { "name": "ObserversHighWatermark", "type": "[]ReplicaStateint64", "versions": "0+" },
        { "name": "HighWatermarkUpdateTimeMsCurrentVoters", "type": "int64[]ReplicaState", "defaultversions": -1, "tag": 0,
          "taggedVersions": "1+", "ignorable": true, "about": "The wall clock time that the high watermark was last updated on the leader },
        { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
         ]}
    ]}],
  "commonStructs": [
      { "name": "ReplicaState", "versions": "0+", "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
      { "name": "LogEndOffset", "type": "int64", "versions": "0+",
        "about": "The last known log end offset of the follower or -1 if it is unknown"},
      { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
        "about": "The last known leader wall clock time time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter"},
      { "name": "LastCaughtUpTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
        "about": "The leader wall clock append time of the offset for which the follower made the most recent fetch request. This is reported as the current time for the leader and -1 if unknown for a voter"}
    ]}
  ]

...

  1. LastFetchTimestamp
    This metric will be reported for each voter. This is a good approximation of the “liveness” of the voters and can be used to detect a network partition in the quorum.
    This information is already known to the leader for all voters and only needs to be added to the response

  2. LastCaughtUpTimestamp
    This metric will be reported for each voter. This is akin to the metric used to track lag for replicas in ISR and it measures the approximate lag between the leader and the replica based on the offsets requested in the fetch requests and when they were made. The metric gives a measure of when was the last time a replica caught up to the leader. .
    To compute this metric, the Replica state maintains a few bits of information about fetch requests as they are received. Some of this information is not tracked by the Raft layer but it can be easily added in. The cost to track this information is minimal and it only requires some additional bookkeeping during pre-existing processing for a fetch.

    NOTE: Given the leader is always caught up to itself, the Last Caught Up Time for the leader will be the leader's wall clock time when it responded to the DescribeQuorum request.

...

Compatibility, Deprecation, and Migration Plan

...