Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Generating and loading the snapshot will be delegated to the Kafka Controller. The Kafka Controller will notify the Kafka Raft layer when it has generated a snapshot and up to which offset is included in the end offset of  the snapshot. The Kafka Raft layer will notify the Kafka Controller when a new snapshot has been fetched from the Active Controller. See section "Interaction Between Kafka Raft and the Kafka Controller" and KIP 595 for more details.

...

The __cluster_metadata topic will have snapshot as the cleanup.policy. The Kafka Controller will have an in-memory representation of the replicated log up to at most the high-watermark. When performing a snapshot the Kafka Controller will serialize this in-memory state to disk. This snapshot file on disk is described by the largest end offset and epoch from the replicated log that has been included.

...

Code Block
Kafka Replicated Log:

      LBO  --               high-watermark --      LEO --
            V                               V           V
        ---------------------------------------------------
offset: | x + 1 | ... | y - 1 | y + 1 |  ...  |   |  ...  |   |
epoch:  | b     | ... | c | d   | d |  ...  |   |  ...  |   |
        ---------------------------------------------------

Kafka Snapshot Files:

<topic_name>-<partition_index>/checkpoints/x-a.checkpoint
<topic_name>-<partition_index>/checkpoints/y-c.checkpoint

...

In the example above, offset=x - 1, epoch=a does not appear in the replicated log because it is before the LBO (x + 1, b). If the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the LBO (x + 1, b) then the leader handles the request as describe in KIP-595. Otherwise, the leader will respond with the epoch and end offset and epoch of the latest snapshot (y, c).

...

Snapshot Format

The Kafka Controller is and Metadata Cache are responsible for the format content of the snapshot. The snapshot will be stored in the checkpoints directory in the topic partition directory and will have a name of <SnapshotId.Offset>EndOffset>-<SnapshotId.Epoch>.checkpoint. For example for the topic __cluster_metadata, partition 0, snapshot end offset 5120793 and snapshot epoch 2, the full filename will be __cluster_metadata-0/checkpoints/00000000000005120793-00000000000000000002.checkpoint.

...

Followers and observers will increase their log begin offset to the value sent on the fetch response as long as the local Kafka Controller and Metadata Cache has generated a snapshot that includes the with an end offset greater than or equal to the new log begin offset minus one.

Kafka allows the clients to delete records that are less than a given offset by using the DeleteRecords RPC . Those requests will be validated using the same logic enumerated aboverejected for the __cluster_metadata.

When to Delete Snapshots

The broker will delete any snapshot with a latest end offset and epoch (SnapshotId) less than the LBO - 1. . That means that the question of when to delete a snapshot is the same as when to increase the LBO which is covered in the section “When To Increase the Log Begin Offset”.

...

  1. Notify when snapshot finished. This includes the largest included end offset and epoch.

Loading Snapshots

...

After loading the snapshot, the Kafka Controller can apply operations in the replicated log with an offset greater than or equal the snapshot’s end offset.

Changes to Leader Election

...

  1. Follower sends a fetch request
  2. Leader replies with a snapshot epoch and end offset.
  3. Follower pauses fetch requests
  4. Follower fetches a snapshot from the leader
  5. Follower resumes fetch requests by
    1. Setting the LBO to the snapshot's end offset plus one.
    2. Setting the LEO or FetchOffset in the fetch request to the snapshot's end offset plus one.
    3. Setting the LastFetchedEpoch in the fetch request to the snapshot's epoch.

...

  1. Contains a snapshot with epoch E and end offset O.
  2. Contains many snapshots older/less than epoch E and end offset O.
  3. Contains a replicated log with LEO older/less than epoch E and end offset O.

In this case the follower needs to use the snapshot with epoch E and end offset O, and not the replicated log when granting votes and requesting votes. In the general case, the latest epoch and end offset for a replica is the latest snapshot or replicated log.

...

  1. If the LBO is 0, there are zero or more snapshots where the end offset of the snapshot is between LBO and the High-Watermark.
  2. If the LBO is greater than 0, there are one or more snapshots where the end offset of the snapshot is between LBO - 1 and the High-Watermark.

If the latest snapshot has an epoch E and end offset O and is it newer than the LEO of the replicated log, then the replicate replica must set the LBO and LEO to O + 1.

Public Interfaces

Topic configuration

  1. The __cluster_metadata topic will have a cleanup.policy value of snapshot. This configuration can only be read. Updates to this configuration will not be allowed.

  2. controller.snapshot.minimum.records is the minimum number of records committed after the latest snapshot before the Kafka Controller and Metadata Cache will perform another snapshot. See section "When to Snapshot". The default is 16384 records.

  3. max.replication.lag.ms - The maximum amount of time that leader will wait for an offset to get replicated to all of the live replicas before advancing the LBO. See section “When to Increase the Log Begin Offset”. The default is 7 days.

  4. replica.fetch.snapshot.max.bytes - The maximum amount of bytes sent per partition in the FetchSnapshotResponse. The default is 10MB.
  5. replica.fetch.snapshot.response.max.bytes - The maximum amount of bytes sent in the entire FetchSnapshotResponse. The default is 10MB.

...

  1. It includes new optional fields for snapshot end offset and snapshot epoch.

  2. The LogStartOffset field is renamed to LogBeginOffset.

...

Code Block
languagejs
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "HighWatermark", "type": "int64", "versions": "0+",
          "about": "The current high water mark." },
        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        { "name": "LogStartOffsetLogBeginOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The current log start offset." },
        { "name": "DivergingEpoch", "type": "EpochEndOffset", "versions": "12+", "taggedVersions": "12+", "tag": 0,
          "about": "In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge",
          "fields": [
            { "name": "Epoch", "type": "int32", "versions": "12+", "default": "-1" },
            { "name": "EndOffset", "type": "int64", "versions": "12+", "default": "-1" }
        ]},
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
        ]},
        // ---------- Start new field ----------
        { "name": "SnapshotId", "type": "SnapshotId",
          "versions": "12+", "taggedVersions": "12+", "tag": 2, "fields": [
          { "name": "OffsetEndOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the end offset that the follower should use for the FetchSnapshot request"},
          { "name": "Epoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        // ---------- End new field ----------
        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
          "about": "The aborted transactions.",  "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "FirstOffset", "type": "int64", "versions": "4+",
            "about": "The first offset in the aborted transaction." }
        ]},
        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true,
          "about": "The preferred read replica for the consumer to use on its next fetch request"},
        { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
          "about": "The record data." }
      ]}
    ]}
  ]
}

...

Code Block
languagejs
{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "0+",
      "about": "The broker ID of the follower." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+",
      "about": "The maximum bytes to fetch from all of the snapshots." }
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "SnapshotId", "type": "SnapshotId",
 "versions": "0+",
        "versions": "12+", "fields": [
          { "name": "OffsetEndOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the end offset that the follower should use for the FetchSnapshot request"},
          { "name": "Epoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot." },
        { "name": "MaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this snapshot." }
      ]}
    ]}
  ]
}

...

  1. SNAPSHOT_NOT_FOUND - when the fetch snapshot request specifies a SnapshotId that doesn’t exists on the leader.

  2. OFFSET_OUT_OF_RANGE - when the fetch snapshot request’s offset request specifies a position that is greater than the size of the snapshot.

  3. NOT_LEADER_FOR_PARTITION - when the fetch snapshot request is sent to a replica that is not the leader.

...

  1. Copy the bytes in Bytes to the local snapshot file name <SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint.part starting at file position Position.
  2. If Continue is true, Size is greater than the size of the current snapshot file, then the follower is expected to send another FetchSnapshotRequest with a Position set to the response's Position plus the response's len(Bytes).

  3. If Continue is false Size is equal to the size of the current snapshot file, then fetching the snapshot has finished.

    1. Atomically move the file named <SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint.part to a file named <SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint.

    2. Notify the Kafka Controller or Metadata Cache that a new snapshot is available.

    3. The Kafka Controller and Metadata Cache will load this new snapshot and read records from the log after the snapshot's end offset.

Errors:

  1. SNAPSHOT_NOT_FOUND, OFFSET_OUT_OF_RANGE, NOT_LEADER_FOR_PARTITION - The follower can rediscover the new snapshot by sending a another fetch request with an offset and epoch of zero.

Metrics

Full NameDescription
kafka.controller:type=KafkaController,name=GenSnapshotLatencyMsA histogram of the amount of time it took to generate a snapshot.
kafka.controller:type=KafkaController,name=LoadSnapshotLatencyMsA histogram of the amount of time it took to load the snapshot.
kafka.controller:type=KafkaController,name=SnapshotSizeBytesSize of the latest snapshot in bytes.

...