Even though client will receive the new leader information in the ProduceResponse & FetchResponse when leader changes, but same as the existing behaviour of the Kafka Java client, it will request expedited metadata-refresh done asynchronously. Since leadership change will likely affect many partitions, so future requests to such partitions will benefit from this.

For Produce, if new leader info is available in the response along with errors(NOT_LEADER_OR_FOLLOWER or FENCED_LEADER_EPOCH) that advances client's view of the leadership, client would no longer back off up to RETRY_BACKOFF_MS_CONFIG, before retrying the failed batch if new leader information that advances clients' metadata state was received in the ProduceResponse. This immediate retry is appealing as the client is going to retry on a different broker and it is likely to succeed because it is retrying on a newer leader. On the other hand, subsequent retries to the same new LeaderEpoch should still continue to be subject to clients' backoff strategy.


FetchResponse already contains CurrentLeader introduced in KIP-595, which will be used to propagate LeaderId & LeaderEpoch. Additionally NodeEndpoints is introduced to propagate host, port & rack for the current-leaders enumerated across all PartitionDatas. NodeEndpoints is NodeEndpoints is a tagged field, which is a minor optimisation of saving bytes on the network, as it won’t be set always. This doesn’t require a version bumpThe version hasn't been bumped, and taggedVersion is set to existing version( "12+"). This should help Kafka clients to implement these optimisations without fully implementing all the versions from 13-16(16 being the new version if bumped).

Code Block
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-15",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Topic", "type": "string", "versions": "0-12", "ignorable": true, "entityType": "topicName",
        "about": "The topic name." },
      { "name": "TopicId", "type": "uuid", "versions": "13+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "HighWatermark", "type": "int64", "versions": "0+",
          "about": "The current high water mark." },
        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The current log start offset." },
        { "name": "DivergingEpoch", "type": "EpochEndOffset", "versions": "12+", "taggedVersions": "12+", "tag": 0,
          "about": "In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge",
          "fields": [
            { "name": "Epoch", "type": "int32", "versions": "12+", "default": "-1" },
            { "name": "EndOffset", "type": "int64", "versions": "12+", "default": "-1" }
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "12+", "default": "-1", "entityType": "brokerId",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "12+", "default": "-1",
            "about": "The latest known leader epoch"}
        { "name": "SnapshotId", "type": "SnapshotId",
          "versions": "12+", "taggedVersions": "12+", "tag": 2,
          "about": "In the case of fetching an offset less than the LogStartOffset, this is the end offset and epoch that should be used in the FetchSnapshot request.",
          "fields": [
            { "name": "EndOffset", "type": "int64", "versions": "0+", "default": "-1" },
            { "name": "Epoch", "type": "int32", "versions": "0+", "default": "-1" }
        { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true,
          "about": "The aborted transactions.",  "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "FirstOffset", "type": "int64", "versions": "4+",
            "about": "The first offset in the aborted transaction." }
        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false, "entityType": "brokerId",
          "about": "The preferred read replica for the consumer to use on its next fetch request"},
        { "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."}
 	// ---------- Start new field ----------  
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "12+", "taggedVersions": "12+", "tag": 0,
      "about": "Endpoints for all current-leaders enumerated in PartitionData.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "12+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node."},
      { "name": "Host", "type": "string", "versions": "12+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "12+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "12+", "nullableVersions": "12+", "ignorable": true, "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
 	// ---------- End new field ----------  
