Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Visually a Kafka Raft Log would look as follow:

Code Block
Kafka Replicated Log:

      LBO  --               high-watermark --      LEO --
            V                               V           V
        ---------------------------------------------------
offset: | x + 1 | ... | y | y + 1 |  ...  |   |  ...  |   |
epoch:  | b     | ... | c | d     |  ...  |   |  ...  |   |
        ---------------------------------------------------

Kafka Snapshot Files:

<topic_name>-<partition_index>/x-a.checkpoint
<topic_name>-<partition_index>/y-c.checkpoint

Legends

  1. LEO - log end offset - the largest offset and epoch that has been written to disk.

  2. high-watermark - the largest offset and epoch that has been replicated to N/2 + 1 replicas.

  3. LBO - log begin offset - the smallest offset in the replicated log. This property used to be called log start offset. It is renamed such that the acronym doesn’t conflict with last stable offset (LSO).

...

  1. It includes new optional fields for snapshot offset and snapshot epoch.

  2. The LogStartOffset field is renamed to LogBeginOffset.

Code Block
languagejs
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partiiton index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "HighWatermark", "type": "int64", "versions": "0+",
          "about": "The current high water mark." },
        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        // Field renamed from LogStartOffset to LogBeginOffset 
        { "name": "LogBeginOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The current log begin offset." },
        // ---------- Start new fields by KIP-595 ----------
        { "name": "NextOffsetAndEpoch", "type": "OffsetAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [
          { "name": "NextFetchOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the offset that the follower should truncate to"},
          { "name": "NextFetchOffsetEpoch", "type": "int32", "versions": "0+",
            "about": "The epoch of the next offset in case the follower needs to truncate"}
        ]},
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
        ]},
        // ---------- End new fields by KIP-595 ----------
        // ---------- Start new fields ----------
        { "name": "SnapshotOffsetAndEpoch", "type": "OffsetAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [
          { "name": "FetchOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the offset that the follower should use for the FetchSnapshot request"},
          { "name": "FetchOffsetEpoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        // ---------- End new fields ----------
        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
          "about": "The aborted transactions.",  "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "FirstOffset", "type": "int64", "versions": "4+",
            "about": "The first offset in the aborted transaction." }
        ]},
        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true,
          "about": "The preferred read replica for the consumer to use on its next fetch request"},
        { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
          "about": "The record data." }
      ]}
    ]}
  ]
}

Handling Fetch Request

The replica’s handling of fetch request will be extended such that if FetchOffset is less than LogBeginOffset then the leader will respond with a SnapshotOffsetAndEpoch of the latest snapshot and a NextOffsetAndEpoch of the offset right after SnapshotOffsetAndEpoch.

...

Fetching Snapshots

Request Schema

Code Block
languagejs
{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "0+",
      "about": "The broker ID of the follower, of -1 if this request is from a consumer." },
    { "name": "Topics", "type": "[]TopicSnapshots", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshots", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "SnapshotOffsetAndEpoch", "type": "OffsetAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [
          { "name": "Offset", "type": "int64", "versions": "0+",
            "about": "If set, this is the offset that the follower should use for the FetchSnapshot request"},
          { "name": "OffsetEpoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot." },
        { "name": "MaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this snapshot." }
      ]}
    ]}
  ]
}

Response Schema

Code Block
languagejs
{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "Topics", "type": "[]TopicSnapshots", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshots", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "MediaType", "type": "string", "version": "0+",
          "about": "The media type and version of the snapshot. This is only set when the position is 0."}
	{ "name": "Continue", "type": "bool": "versions": "0+",
	  "about": "True if there is data remaining for fetch." },
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot." },
	{ "name": "Bytes", "type": "bytes": "versions": "0+",
	  "about": "Snapshot data." },
      ]}
    ]}
  ]
}

Request Handling

When the leader receives a FetchSnapshot request it would do the following for each partition:

...