Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added KIP-951 optimisations to RPC responses

...

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareFetchResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": true,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "0+", "default": "0", "ignorable": false,
      "about": "The share session ID." },
    { "name": "Responses", "type": "[]ShareFetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "ignorable": true, "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "LastStableOffset", "type": "int64", "versions": "0+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        { "name": "AbortedTransactionsCurrentLeader", "type": "[]AbortedTransactionLeaderIdAndEpoch", "versions": "0+", "nullableVersionstaggedVersions": "0+", "ignorabletag": true,
          "about": "The aborted transactions."0,  "fields": [
          { "name": "ProducerIdLeaderId", "type": "int64int32", "versions": "0+", "entityType": "producerId",
            "about": "The producer id associated withID of the current leader or -1 if the abortedleader is transactionunknown." },
          { "name": "FirstOffsetLeaderEpoch", "type": "int64int32", "versions": "0+",
            "about": "The firstlatest offsetknown in the aborted transactionleader epoch." }
        ]},
        { "name": "RecordsAbortedTransactions", "type": "records[]AbortedTransaction", "versions": "0+", "nullableVersions": "0+", "ignorable": true,
          "about": "The recordaborted datatransactions."},
,  "fields": [
          { "name": "AcquiredRecordsProducerId", "type": "[]AcquiredRecordsint64", "versions": "0+", "aboutentityType": "The acquired records.", "fields":  [producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "BaseOffsetFirstOffset", "type":  "int64", "versions": "0+",
            "about": "The earliestfirst offset in thisthe batch of acquired recordsaborted transaction." },
        ]},
        { "name": "LastOffsetRecords", "type": "int64records", "versions": "0+", "nullableVersions": "0+", "about": "The last offset of this batch of acquired recordsrecord data."},
        {  {"name": "DeliveryCountAcquiredRecords", "type": "int16[]AcquiredRecords", "versions": "0+", "about": "The delivery count of this batch of acquired records."}
        ]}, "fields":  [
      ]}
    ]}
  ]
}

ShareAcknowledge API

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  {"name": "BaseOffset", "type":  "int64", "versions": "0+", "about": "The earliest offset in this batch of acquired records."},
          {"name": "ShareAcknowledgeRequestLastOffset",
  "type": "int64", "validVersionsversions": "0+",
  "about": "The last offset of this batch of acquired records."},
          {"name": "DeliveryCount", "type": "int16", "versions": "0+", "about": "The delivery count of this batch of acquired records."}
        ]}
      ]}
    ]},
    { "name": "NodeEndpoints", "type": "[]NodeEndpoint", "versions": "16+", "taggedVersions": "0+", "tag": 0,
      "about": "Endpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "NodeId", "type": "int32", "versions": "0+",
        "mapKey": true, "entityType": "brokerId", "about": "The ID of the associated node." },
      { "name": "Host", "type": "string", "versions": "0+",
        "about": "The node's hostname." },
      { "name": "Port", "type": "int32", "versions": "0+",
        "about": "The node's port." },
      { "name": "Rack", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The rack of the node, or null if it has not been assigned to a rack." }
    ]}
  ]
}

ShareAcknowledge API

The ShareAcknowledge API is used by share group consumers to acknowledge delivery of records with share-partition leaders.

Request schema

Code Block
{
  "apiKey": NN,
  "type": "request",
  "listeners": ["broker"],
  "name": "ShareAcknowledgeRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "SessionId", "type": "int32", "versions": "0+",
      "about": "The share session ID." },
    { "name": "SessionEpoch", "type": "int32", "versions": "0+",
      "about": "The share session epoch, which is used for ordering requests in a session." },
    { "name": "Topics", "type": "[]AcknowledgeTopic", "versions": "0+",
      "about": "The topics containing records to acknowledge.", "fields": [
      { "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The unique topic ID"},
      { "name": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "about": "The partitions containing records to acknowledge.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "AcknowledgementBatches", "type": "[]AcknowledgementBatch", "versions": "0+",
          "about": "Record batches to acknowledge.", "fields": [
          { "name": "StartOffset", "type": "int64", "versions": "0+",
            "about": "Start offset of batch of records to acknowledge."},
          { "name": "LastOffset", "type": "int64", "versions": "0+",
            "about": "Last offset (inclusive) of batch of records to acknowledge."},
      flexibleVersions": "0+",
  "fields": [
    { "name": "SessionIdGapOffsets", "type": "int32[]int64", "versions": "0+",
            "about": "The share session ID." Array of offsets in this range which do not correspond to records."},
          { "name": "SessionEpochAcknowledgeType", "type": "int32string", "versions": "0+", "default": "accept",
            "about": "The sharetype sessionof epoch, which is used for ordering requests in a session." },
    { "name": "Topics", "type": "[]AcknowledgeTopic", "versionsacknowledgement, such as accept or release."}
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareAcknowledgeResponse",
  "validVersions": "0+",
      "aboutflexibleVersions": "The topics containing records to acknowledge.0+",
  "fields": [
      { "name": "TopicIdThrottleTimeMs", "type": "uuidint32", "versions": "0+", "aboutignorable": "The unique topic ID"}true,
      { "nameabout": "Partitions", "type": "[]AcknowledgePartition", "versions": "0+",
        "about": "The partitions containing records to acknowledge.", "fields": [
    The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "PartitionIndexErrorCode", "type": "int32int16", "versions": "0+",
 "ignorable": true,
        "about": "The partition indextop level response error code." },
        { "name": "AcknowledgementBatchesSessionId", "type": "[]AcknowledgementBatchint32", "versions": "0+",
 "default": "0", "ignorable": false,
      "about": "RecordThe batchesshare tosession acknowledgeID.", "fields": [
     },
      { "name": "StartOffsetResponses", "type": "int64[]ShareAcknowledgeTopicResponse", "versions": "0+",
            "about": "StartThe offset of batch of records to acknowledgeresponse topics."},
    "fields": [
      { "name": "LastOffsetTopicId", "type": "int64uuid", "versions": "0+",
           "ignorable": true, "about": "LastThe offsetunique (inclusive) of batch of records to acknowledge.topic ID"},
          { "name": "GapOffsetsPartitions", "type": "[]int64PartitionData", "versions": "0+",
            "about": "ArrayThe of offsets in this range which do not correspond to records."},
  topic partitions.", "fields": [
        { "name": "AcknowledgeTypePartitionIndex", "type": "stringint32", "versions": "0+", "default": "accept",
            "about": "The type of acknowledgement, such as accept or release."}
        ]}
      ]}
    ]}
  ]
}

Response schema

Code Block
{
  "apiKey": NN,
  "type": "response",
  "name": "ShareAcknowledgeResponse",
  "validVersionspartition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
  "flexibleVersions        "about": "0+"The error code, or 0 if there was no error." },
  "fields": [
      { "name": "ThrottleTimeMsCurrentLeader", "type": "int32LeaderIdAndEpoch", "versions": "0+", "ignorabletaggedVersions": true"0+",
      "abouttag": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
 0, "fields": [
          { "name": "ErrorCodeLeaderId", "type": "int16int32", "versions": "0+", "ignorable": true,

            "about": "The top level response error codeID of the current leader or -1 if the leader is unknown." },
          { "name": "SessionIdLeaderEpoch", "type": "int32", "versions": "0+", "default": "0", "ignorable": false,

            "about": "The latest shareknown sessionleader IDepoch." }
        ]}
      ]}
    ]},
    { "name": "ResponsesNodeEndpoints", "type": "[]ShareAcknowledgeTopicResponseNodeEndpoint", "versions": "16+", "taggedVersions": "0+", "tag": 0,
      "about": "The response topicsEndpoints for all current leaders enumerated in PartitionData with error NOT_LEADER_OR_FOLLOWER.", "fields": [
      { "name": "TopicIdNodeId", "type": "uuidint32", "versions": "0+",
        "ignorablemapKey": true, "entityType": "brokerId", "about": "The ID uniqueof the topicassociated IDnode." },
      { "name": "PartitionsHost", "type": "[]PartitionDatastring", "versions": "0+",
        "about": "The topicnode's partitionshostname.", "fields": [
   },
      { "name": "PartitionIndexPort", "type": "int32", "versions": "0+",
          "about": "The partitionnode's indexport." },
        { "name": "ErrorCodeRack", "type": "int16string", "versions": "0+",
    "nullableVersions": "0+", "default": "null",
        "about": "The error coderack of the node, or 0null if thereit washas nonot error." }
      ]been assigned to a rack." }
    ]}
   ]
}

Metrics

Further details to follow as the design progresses.

...