Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:"Under DiscussionApproved"

Discussion thread: here

JIRA: KAFKA-13888

...

This KIP proposes making DescribeQuorum  API accessible via the admin client and augmenting the DescribeQuorum API response with more information to be able to ascertain the liveness and lag of the voters in the quorum more accurately.

Public Interfaces

Additional Classes to expose the

...

DescribeMetadataQuorumResult  API to the admin client:

Code Block
public class DescribeQuorumResultDescribeMetadataQuorumResult {

    private final KafkaFuture<QuorumInfo> quorumInfo;

    DescribeQuorumResultDescribeMetadataQuorumResult(KafkaFuture<QuorumInfo> quorumInfo) {
        this.quorumInfo = quorumInfo;
    }

    /**
     * Returns a future containing the QuorumInfo
     */
    public KafkaFuture<QuorumInfo> quorumInfo() {
        return quorumInfo;
    }
}


Code Block
public class QuorumInfo {
  private final String topic;
  private final Integer leaderId;
    private final List<ReplicaState> voters;
    private final List<ReplicaState> observers;

   public QuorumInfo(String topic, Integer leaderId, List<ReplicaState> voters, List<ReplicaState> observers) {
        this.topicleaderId = topicleaderId;
    this.leaderId = leaderId;
    this.voters = voters;
        this.observers = observers;
  }

  public String topic() {
    return topic;}

  }

  public Integer leaderId() {
        return leaderId;
    }

    public List<ReplicaState> voters() {
        return voters;
    }

    public List<ReplicaState> observers() {
        return observers;
    }

  public static class@Override
 ReplicaState {
  public boolean private final int replicaId;equals(Object o) {
    private final long logEndOffset;
 if (this == privateo) finalreturn long lastFetchTimeMstrue;
     private final long lastCaughtUpTimeMs;

    public ReplicaState(int replicaId, long logEndOffset) {
 if (o == null || getClass() != o.getClass()) return false;
     this.replicaId = replicaId;
 QuorumInfo that = (QuorumInfo) o;
 this.logEndOffset = logEndOffset;
     return this.lastFetchTimeMs = -1;
leaderId.equals(that.leaderId)
       this.lastCaughtUpTimeMs = -1;
    }

&& voters.equals(that.voters)
     public ReplicaState(int replicaId, long logEndOffset,
   && observers.equals(that.observers);
    long}

 lastFetchTimeMs, long lastCaughtUpTimeMs) {@Override
    public  this.replicaId = replicaId;
int hashCode() {
       this.logEndOffset = logEndOffset;
 return Objects.hash(leaderId, voters, observers);
    }

    @Override
    public this.lastFetchTimeMs = lastFetchTimeMs;
String toString() {
        this.lastCaughtUpTimeMs = lastCaughtUpTimeMs;
return "QuorumInfo(" +
             }

    public int replicaId() {
      return replicaId;
    }

    public long logEndOffset() {
      return logEndOffset;
    }

    public long lastFetchTimeMs() {
      return lastFetchTimeMs;
    }

    public long lastCaughtUpTimeMs"leaderId=" + leaderId +
            ", voters=" + voters +
            ", observers=" + observers +
            ')';
    }

    public static class ReplicaState {
        private final int replicaId;
        private final long logEndOffset;
        private final OptionalLong lastFetchTimeMs;
        private final OptionalLong lastCaughtUpTimeMs;

        ReplicaState() {
            this(0, 0, OptionalLong.empty(), OptionalLong.empty());
        }

        ReplicaState(
            int replicaId,
            long logEndOffset,
            OptionalLong lastFetchTimeMs,
            OptionalLong lastCaughtUpTimeMs
        ) {
            this.replicaId = replicaId;
            this.logEndOffset = logEndOffset;
            this.lastFetchTimeMs = lastFetchTimeMs;
            this.lastCaughtUpTimeMs = lastCaughtUpTimeMs;
        }

        /**
         * Return the ID for this replica.
         * @return The ID for this replica
         */
        public int replicaId() {
            return replicaId;
        }

        /**
         * Return the logEndOffset known by the leader for this replica.
         * @return The logEndOffset for this replica
         */
        public long logEndOffset() {
            return logEndOffset;
        }

        /**
         * Return the lastFetchTime in milliseconds for this replica.
         * @return The value of the lastFetchTime if known, empty otherwise
         */
        public OptionalLong lastFetchTimeMs() {
            return lastFetchTimeMs;
        }

        /**
         * Return the lastCaughtUpTime in milliseconds for this replica.
         * @return The value of the lastCaughtUpTime if known, empty otherwise
         */
        public OptionalLong lastCaughtUpTimeMs() {
            return lastCaughtUpTimeMs;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            ReplicaState that = (ReplicaState) o;
            return replicaId == that.replicaId
                && logEndOffset == that.logEndOffset
                && lastFetchTimeMs.equals(that.lastFetchTimeMs)
                && lastCaughtUpTimeMs.equals(that.lastCaughtUpTimeMs);
        }

        @Override
        public int hashCode() {
            return Objects.hash(replicaId, logEndOffset, lastFetchTimeMs, lastCaughtUpTimeMs);
        }

        @Override
        public String toString() {
            return "ReplicaState(" +
                "replicaId=" + replicaId +
                ", logEndOffset=" + logEndOffset +
                ", lastFetchTimeMs=" + lastFetchTimeMs +
                ", lastCaughtUpTimeMs=" + lastCaughtUpTimeMs +
                ')';
        }
    }
}


Code Block
public class DescribeMetadataQuorumOptions extends AbstractOptions<DescribeMetadataQuorumOptions> {

}

DescribeMetadataQuorum Handler in the Admin Client

Code Block
  /**
     * Describes the state of the metadata quorum.
     * <p>
     * This is a convenience method for {@link #describeMetadataQuorum(DescribeMetadataQuorumOptions)} with default options.
     * See the overload for more details.
     *
     * @return the {@link DescribeMetadataQuorumResult} containing the result
     */
    default DescribeMetadataQuorumResult describeMetadataQuorum() {
      return lastCaughtUpTimeMs;
 return   }
  }
}
Code Block
public class DescribeQuorumOptions extends AbstractOptions<DescribeQuorumOptions> {
}

DescribeQuorum Handler in the Admin Client

Code Block
describeMetadataQuorum(new DescribeMetadataQuorumOptions());
    }

    /**
     * DescribeDescribes the state of the raftmetadata quorum.
     * <p>
     * The following exceptions can be anticipated when calling {@code get()} on the futures obtained from
     * the returned {@code DescribeQuorumResultDescribeMetadataQuorumResult}:
     * <ul>
     *   <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException}
     *   If the authenticated user didn't have {@code DESCRIBE} access to the cluster.</li>
     *   <li>{@link org.apache.kafka.common.errors.TimeoutException}
     *   If the request timed out before the controller could list the cluster links.</li>
     * </ul>
     *
     * @param options The {@link optionsDescribeMetadataQuorumOptions} to use when describing the quorum.
     * @return The DescribeQuorumResult. the {@link DescribeMetadataQuorumResult} containing the result
     */
    DescribeQuorumResultDescribeMetadataQuorumResult describeQuorumdescribeMetadataQuorum(DescribeQuorumOptionsDescribeMetadataQuorumOptions options);

Proposed change in the DescribeQuorum Response:

Code Block
  "apiKey": 55,
  "type": "response",
  "name": "DescribeQuorumResponse",
  "validVersions": "0-1",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top level error code."},
    { "name": "Topics", "type": "[]TopicData",
      "versions": "0+", "fields": [
      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]PartitionData",
        "versions": "0+", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+"},
        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
          "about": "The ID of the current leader or -1 if the leader is unknown."},
        { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
          "about": "The latest known leader epoch"},
        { "name": "HighWatermark", "type": "int64", "versions": "0+"},
        { "name": "CurrentVoters", "type": "[]ReplicaState", "versions": "0+" },
        { "name": "Observers", "type": "[]ReplicaState", "versions": "0+" }
      ]}
    ]}],
  "commonStructs": [
      { "name": "ReplicaState", "versions": "0+", "fields": [
      { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId" },
      { "name": "LogEndOffset", "type": "int64", "versions": "0+",
        "about": "The last known log end offset of the follower or -1 if it is unknown"},
             { "name": "LastFetchTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
            "about": "The last known leader wall clock time time when a follower fetched from the leader. This is reported as -1 both for the current leader or if it is unknown for a voter"},
      { "name": "LastCaughtUpTimestamp", "type": "int64", "versions": "1+", "ignorable": true, "default": -1,
   
         "about": "The leader wall clock append time of the offset for which the follower made the most recent fetch request. This is reported as the current time for the leader and -1 if unknown for a voter"}
    ]}
   ]

kafka-metadata-quorum.sh

...