Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note: The extension checkpoint will be used since Kafka already has a file with the snapshot extension.

LegendsLegend

  1. LEO - log end offset - the largest offset and epoch that has been written to disk.

  2. high-watermark - the largest offset and epoch that has been replicated to N/2 + 1 replicas.

  3. LBO - log begin offset - the smallest offset in the replicated log. This property used to be called log start offset. It is renamed such that the acronym doesn’t conflict with last stable offset (LSO).

In the example above, offset=x, epoch=a does not appear in the replicated log because it is before the LBO (x + 1, b). If the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the LBO (x + 1, b) then the leader handles the request as describe in KIP-595. Otherwise, the leader will respond with the offset and epoch of the latest snapshot (y, c) and with the next fetch offset and epoch (y + 1, d).

The followers and observers can concurrently fetch both the snapshot and the replicated log. During leader election, followers that violate the invariant in the "Validation of Snapshot and Log" section will send a vote request and response as if they had an empty log.

...

The Kafka Controller is responsible for the format of the snapshot. The snapshot will be stored in the checkpoints directory in the topic partition directory and will have a name of <SnapshotOffset>-<SnapshotOffsetEpoch><SnapshotId.Offset>-<SnapshotId.Epoch>.checkpoint. For example for the topic __cluster_metadata, partition 0, snapshot offset 5120793 and snapshot epoch 2, the full filename will be __cluster_metadata-0/checkpoints/00000000000005120793-00000000000000000002.checkpoint.

The snapshot epoch will be used when ordering snapshots and more importantly when setting the LastFetchedEpoch in the Fetch request. It is possible for a follower to have a snapshot and an empty log. In this case the follower will use the epoch of the snapshot when setting the LastFetchEpoch in the Fetch request.

The disk format for snapshot files will be the same as the version 2 of the log format. This is the version 2 log format for reference:

...

The broker will delete any snapshot with a latest offset and epoch (SnapshotOffsetAndEpochSnapshotId) less than the LBO - 1. That means that the question of when to delete a snapshot is the same as when to increase the LBO which is covered in the section “When To Increase the Log Begin Offset”.

...

The followers of the __cluster_metadata topic partition will concurrently fetch the snapshot and replicated log. This means that candidates with incomplete snapshots will send a vote request with a LastOffsetEpoch of -1 and a LastOffset of -1 no matter the LEO of the need to take the snapshots into account when granting votes and asking for votes (becoming a candidate). Conceptually, follower implement the following algorithm:

  1. Follower sends a fetch request
  2. Leader replies with a snapshot epoch and offset.
  3. Follower pauses fetch requests
  4. Follower fetches a snapshot from the leader
  5. Follower resumes fetch requests by
    1. Setting the LBO to the snapshot offset plus one.
    2. Setting the LEO or FetchOffset in the fetch request to the snapshot's offset plus one
    3. Setting the LastFetchedEpoch in the fetch request to the snapshot's epoch.

Assuming that the snapshot fetched in bullet 4 has an epoch E and a offset O. The state of the follower between bullet 4. and 5. could be as follow:

  1. Contains a snapshot with epoch E and offset O.
  2. Contains many snapshots older/less than epoch E and offset O.
  3. Contains a replicated log with LEO older/less than epoch E and offset O.

In this case the follower needs to use the snapshot with epoch E and offset O, and not the replicated log when granting votes and requesting votes. In the general case, the latest epoch and offset for a replica is the latest snapshot or replicated log.

Validation of Snapshot and Log

For each replica we have the following invariant:

  1. If the LBO is 0, there are zero or more snapshots where the offset of the snapshot is between LBO and the High-Watermark.
  2. If the LBO is greater than 0, there are one or more snapshots where the offset of the snapshot is between LBO - 1 and the High-Watermark.

If this is not the case then the replica should assume that the log is empty. Once the replica discovers the leader for the partition it can bootstrap itself by using the Fetch and FetchSnapshot RPCs as described in this documentthe latest snapshot has an epoch E and offset O and is it newer than the LEO of the replicated log, then the replicate must set the LBO and LEO to O + 1.

Public Interfaces

Topic configuration

  1. The __cluster_metadata topic will have a cleanup.policy value of snapshot. This configuration can only be read. Updates to this configuration will not be allowed.

  2. controller.snapshot.minimum.records is the minimum number of records committed after the latest snapshot before the Kafka Controller will perform another snapshot. See section "When to Snapshot"

  3. max.replication.lag.ms - The maximum amount of time that leader will wait for an offset to get replicated to all of the live replicas before advancing the LBO. See section “When to Increase the Log Begin Offset”the Log Begin Offset”.

  4. replica.fetch.snapshot.max.bytes - The maximum amount of bytes sent per partition in the FetchSnapshotResponse. The default is 10MB.
  5. replica.fetch.snapshot.response.max.bytes - The maximum amount of bytes sent in the entire FetchSnapshotResponse. The default is 10MB.

Network Protocol

Fetch

Request Schema

...

Code Block
languagejs
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partiitonpartition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "HighWatermark", "type": "int64", "versions": "0+",
          "about": "The current high water mark." },
        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        // Field renamed from LogStartOffset to LogBeginOffset 
        { "name": "LogBeginOffsetLogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The current log beginstart offset." },
        // ---------- Start new fields by KIP-595 ----------
        { "name": "NextOffsetAndEpoch", "type": "OffsetAndEpoch"{ "name": "DivergingEpoch", "type": "EpochEndOffset", "versions": "12+", "taggedVersions": "12+", "tag": 0,
          "versionsabout": "12+", "taggedVersions": "12+", "tag": 0,In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge",
          "fields": [
            { "name": "NextFetchOffsetEpoch", "type": "int64int32", "versions": "012+",
            "about"default": "-1"If set, this is the offset that the follower should truncate to"},
},
            { "name": "NextFetchOffsetEpochEndOffset", "type": "int32int64", "versions": "012+",
            "about": "The epoch of the next offset in case the follower needs to truncate" "default": "-1" }
        ]},
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
        ]},
        // ---------- End new fields by KIP-595 ----------
        // ---------- Start new fieldsfield ----------
        { "name": "SnapshotOffsetAndEpochSnapshotId", "type": "OffsetAndEpochSnapshotId",
          "versions": "12+", "taggedVersions": "12+", "tag": 02, "fields": [
          { "name": "FetchOffsetOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the offset that the follower should use for the FetchSnapshot request"},
          { "name": "FetchOffsetEpochEpoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        // ---------- End new fieldsfield ----------
        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
          "about": "The aborted transactions.",  "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "FirstOffset", "type": "int64", "versions": "4+",
            "about": "The first offset in the aborted transaction." }
        ]},
        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true,
          "about": "The preferred read replica for the consumer to use on its next fetch request"},
        { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
          "about": "The record data." }
      ]}
    ]}
  ]
}

...

The leader’s handling of fetch request will be extended such that if FetchOffset is less than LogBeginOffset then the leader will respond with a SnapshotOffsetAndEpoch SnapshotId of the latest snapshot and a NextOffsetAndEpoch of the offset right after SnapshotOffsetAndEpoch.

Handling Fetch Response

The replica's handling of fetch response will be extended such that if SnapshotOffsetAndEpoch SnapshotId is set then the follower will delete its entire log and snapshotspause fetching of the log, and start fetching from the leader both the log and the snapshot. Where to fetch in the Log is describe by NextOffsetAndEpoch while which snapshot to fetch is described by SnapshotOffsetAndEpochthe snapshot from the leader. The value use in SnapshotId will be used in the FetchSnapshot requests. Once the snapshot has been fetched completely, the local log will be truncated and fetch will resume.

Fetching Snapshots

Request Schema

Code Block
languagejs
{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "0+",
      "about": "The broker ID of the follower." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+", of -1 if this request is from a consumer
      "about": "The maximum bytes to fetch from all of the snapshots." },
    { "name": "Topics", "type": "[]TopicSnapshotsTopicSnapshot", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshotsPartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndexIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "SnapshotOffsetAndEpochSnapshotId", "type": "OffsetAndEpochSnapshotId",
          "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [
          { "name": "Offset", "type": "int64", "versions": "0+",
            "about": "If set, this is the offset that the follower should use for the FetchSnapshot request"},
          { "name": "OffsetEpochEpoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot." },
        { "name": "MaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this snapshot." }
      ]}
    ]}
  ]
}

...

Code Block
languagejs
{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "Topics", "type": "[]TopicSnapshotsTopicSnapshot", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshotsPartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndexIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
	        { "name": "ContinueSize", "type": "boolint64": "versions": "0+",
	          "about": "TrueThe iftotal theresize isof data remaining to fetchthe snapshot." },
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot." },
	        { "name": "Bytes", "type": "bytes": "versions": "0+",
	          "about": "Snapshot data." },
      ]}
    ]}
  ]
}

...

When the leader receives a FetchSnapshot request it would do the following:

  1. Find the snapshot for SnapshotOffsetAndEpoch SnapshotId for the topic Name and partition PartitionIndex.

  2. Send the bytes in the snapshot from Position up to at most MaxBytes.

  3. Set Continue to true if there are renaming bytes in the snapshots.

...

  1. SNAPSHOT_NOT_FOUND - when the fetch snapshot request specifies a SnapshotOffsetAndEpoch SnapshotId that doesn’t exists on the leader.

  2. OFFSET_OUT_OF_RANGE - when the fetch snapshot request’s offset is greater than the size of the snapshot.

  3. NOT_LEADER_FOR_PARTITION - when the fetch snapshot request is sent to a replica that is not the leader.

...

This KIP is only implemented for the internal topic __cluster_metadata. Internal and external clients for all other topics can ignore the SnapshotOffsetAndEpoch SnapshotId as that field will not be set for those topic partitions.

...