Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In KIP-500 the Active Controller, which is the quorum leader from KIP-595, will materialize the entries in the metadata log into memory. Each broker in KIP-500, which will be a voter or an observer replica of the metadata log, will materialize the entries in the log into a metadata cache. We want to provide stronger guarantee on the consistency of the metadata than is possible today. If all entries in the metadata log are indefinitely retained, then the design described in KIP-595 achieves a consistent log and as a result a consistent metadata cache. If Kafka doesn’t limit the size of this replicated log it can affect Kafka availability by:

...

Code Block
Kafka Replicated Log:

LogStartOffset  --             high-watermark --           LEO --
                 V                             V                V
               -----------------------------------------------
       offset: | x | ... | y - 1 | y |  ...  |   |  ...  |   |
       epoch:  | b | ... | c     | d |  ...  |   |  ...  |   |
               -----------------------------------------------

Kafka Snapshot Files:

<topic_name>-<partition_index>/checkpoints/x-a.checkpoint
<topic_name>-<partition_index>/checkpoints/y-c.checkpoint

Note: The extension checkpoint will be used since Kafka already has a file with the snapshot extension.

...

The Kafka Controller and Metadata Cache are responsible for the content of the snapshot. Each snapshot is uniquely identified by a SnapshotId, the epoch and end offset of the records in the replicated log included in the snapshot. The snapshot will be stored in the checkpoints directory in the topic partition directory and will have a name of <SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint. For example, for the topic __cluster_metadata, partition 0, snapshot end offset 5120793 and snapshot epoch 2, the full filename will be __cluster_metadata-0/checkpoints/00000000000005120793-00000000000000000002.checkpoint.

...

  1. To use valid BaseOffset and OffsetDelta in the BatchHeader and Record respectively.
  2. For the records in the snapshot to match the records in the replicated log.

...

Snapshot

...

Control Records

To allow the KRaft implementation to include additional information on the snapshot without affect If the Kafka Controller or Metadata Cache generates a snapshot too frequently then it can negatively affect the performance of the disk. If it doesn't generate a snapshot often enough then it can increase the amount of time it takes to load its state into memory and it can increase the amount space taken up by the replicated log. The Kafka Controller and Metadata Cache will have new configuration options metadata.snapshot.min.changed_records.ratio and metadata.snapshot.min.new_records.size. Both of these configuration need to be satisfied before the Controller or Metadata Cache will generate a new snapshot.

  1. metadata.snapshot.min.changed_records.ratio is the minimum number of snapshot records that have changed (deleted or modified) between the latest snapshot and the current in-memory state. Note that new snapshot records don't count against this ratio. If a new snapshot record was added since that last snapshot then it doesn't affect the dirty ratio. If a snapshot record was added and then modify or deleted then it counts against the ratio.
  2.  metadata.snapshot.min.new_records.size is the minimum size of the records in the replicated log between the latest snapshot and then high-watermark.

When to Increase the LogStartOffset

To minimize the need to send snapshots to slow replicas, the leader can delay increasing the log start offset up to the minimum offset that it knows has been replicate to all of the live replicas. The leader's LogStartOffset can be increased to an offset X if the following is true:

  1. There is a snapshot for the topic partition with an end offset of X.

  2. One of the following is true:

    1. All of the live replicas (followers and observers) have replicated LogStartOffset or

    2. LogStartOffset is metadata.start.offset.lag.time.max.ms milliseconds old.

Followers and observers will increase their log start offset to the value sent on the fetch response as long as the local Kafka Controller and Metadata Cache has generated a snapshot with an end offset greater than or equal to the new log start offset. In other words for followers and observers the local log start offset is the minimum of the leader's log start offset and the largest local snapshot.

Kafka allows the clients to delete records that are less than a given offset by using the DeleteRecords RPC . Those requests will be rejected for the __cluster_metadata topic with a POLICY_VIOLATION error.

When to Delete Snapshots

The broker will delete any snapshot with a end offset and epoch (SnapshotId) less than the LogStartOffset. That means that the question of when to delete a snapshot is the same as when to increase the LogStartOffset which is covered in the section “When To Increase the LogStartOffset”.

Interaction with the Kafka Controller and Metadata Cache

This section describe the interaction between the state machine and Kafka Raft.

From Kafka Raft to the Kafka Controller and Metadata Cache

  1. Notify when a snapshot is available because of FetchSnapshot.

From the Kafka Controller and Metadata Cache to Kafka Raft

  1. Notify when snapshot finished. This includes the end offset and epoch.

Loading Snapshots

There are two cases when the Kafka Controller or Metadata Cache needs to load a snapshot:

  1. When it is booting.

  2. When the follower and observer replicas finishes fetching a new snapshot from the leader.

After loading the snapshot, the Kafka Controller and Metadata Cache can apply operations in the replicated log with an offset greater than or equal the snapshot’s end offset.

Changes to Leader Election

The followers of the __cluster_metadata topic partition need to take the snapshots into account when granting votes and asking for votes (becoming a candidate). Conceptually, follower implement the following algorithm:

  1. Follower sends a fetch request
  2. Leader replies with a snapshot epoch and end offset.
  3. Follower pauses fetch requests
  4. Follower fetches snapshot chunks from the leader into a temporary snapshot file (<EndOffset>-<Epoch>.checkpoint.part)
  5. Validate fetched snapshot
    1. All snapshot chunks are fetched
    2. Verify the CRC of the records in the snapshot
    3. Atomically move the temporary snapshot file (<EndOffset>-<Epoch>.checkpoint.part) to the permanent location (<EndOffset>-<Epoch>.checkpoint)
  6. Follower resumes fetch requests by
    1. Setting the LogStartOffset to the snapshot's end offset.
    2. Setting the LEO or FetchOffset in the fetch request to the snapshot's end offset.
    3. Setting the LastFetchedEpoch in the fetch request to the snapshot's epoch.

Assuming that the snapshot fetched in bullet 4 has an epoch E1 and a offset O1. The state of the follower between bullet 4. and 5. is as follow:

  1. Contains a snapshot with epoch E1 and end offset O1.
  2. Contains zero or more snapshots older/less than epoch E1 and end offset O1.
  3. Contains a replicated log with LEO older/less than epoch E1 and end offset O1.

In this case the follower needs to use the snapshot with epoch E1 and end offset O1, and not the replicated log when granting votes and requesting votes. In the general case, the latest epoch and end offset for a replica is the latest snapshot or replicated log.

Validation of Snapshot and Log

For each replica we have the following invariant:

  1. If the LogStartOffset is 0, there are zero or more snapshots where the end offset of the snapshot is between LogStartOffset and the High-Watermark.
  2. If the LogStartOffset is greater than 0, there are one or more snapshots where the end offset of the snapshot is between LogStartOffset and the High-Watermark.

If the latest snapshot has an epoch E1 and end offset O1 and is it newer than the LEO of the replicated log, then the replica must set the LogStartOffset and LEO to O1.

Public Interfaces

Topic configuration

  1. The __cluster_metadata topic will have a cleanup.policy value of snapshot. This configuration can only be read. Updates to this configuration will not be allowed.

  2. metadata.snapshot.min.changed_records.ratio - The minimum ratio of snapshot records that have to change before generating a snapshot. See section "When to Snapshot". The default is .5 (50%).

  3. metadata.snapshot.min.new_records.size - This is the minimum number of bytes in the replicated log between the latest snapshot and the high-watermark needed before generating a new snapshot. See section "When to Snapshot". The default is 20MB.
  4. metadata.start.offset.lag.time.max.ms - The maximum amount of time that leader will wait for an offset to get replicated to all of the live replicas before advancing the LogStartOffset. See section “When to Increase the LogStartOffset”. The default is 7 days.

Network Protocol

Fetch

Request Schema

This KIP doesn’t require any changes to the fetch request outside of what is proposed in KIP-595.

Response Schema

The fetch response proposed in KIP-595 will be extended such that it includes new optional fields for snapshot end offset and snapshot epoch.

and Metadata cache the snapshot will include two control record batches. The control record batch SnapshotHeaderRecord  will always be the first record batch in a snapshot. The control record batch SnapshotFooterRecord  will be the last record batch in a snapshot. These two records will have the following schema.

Snapshot Header Schema

Code Block
{
  "type": "data",
  "name": "SnapshotHeaderRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    {"name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the snapshot header record"},
    { "name": "LastContainedLogTimestamp", "type": "int64", "versions": "0+",
      "about": "The append time of the last record from the log contained in this snapshot" }
  ]
}

Snapshot Footer Schema

Code Block
{
  "type": "data",
  "name": "SnapshotFooterRecord",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "Version", "type": "int16", "versions": "0+",
      "about": "The version of the snapshot footer record" }
  ]
}

When to Snapshot

If the Kafka Controller or Metadata Cache generates a snapshot too frequently then it can negatively affect the performance of the disk. If it doesn't generate a snapshot often enough then it can increase the amount of time it takes to load its state into memory and it can increase the amount space taken up by the replicated log. The Kafka Controller and Metadata Cache will have new configuration options metadata.snapshot.min.changed_records.ratio and metadata.log.max.record.bytes.between.snapshots. Both of these configuration need to be satisfied before the Controller or Metadata Cache will generate a new snapshot.

  1. metadata.snapshot.min.changed_records.ratio is the minimum number of snapshot records that have changed (deleted or modified) between the latest snapshot and the current in-memory state. Note that new snapshot records don't count against this ratio. If a new snapshot record was added since that last snapshot then it doesn't affect the dirty ratio. If a snapshot record was added and then modify or deleted then it counts against the ratio.
  2.  metadata.log.max.record.bytes.between.snapshots is the minimum size of the records in the replicated log between the latest snapshot and then high-watermark.

When to Increase the LogStartOffset

To minimize the need to send snapshots to slow replicas, the leader can delay increasing the log start offset up to the minimum offset that it knows has been replicate to all of the live replicas. The leader's LogStartOffset can be increased to an offset X if the following is true:

  1. There is a snapshot for the topic partition with an end offset of X.

  2. One of the following is true:

    1. All of the live replicas (followers and observers) have replicated LogStartOffset or

    2. LogStartOffset is metadata.start.offset.lag.time.max.ms milliseconds old.

Followers and observers will increase their log start offset to the value sent on the fetch response as long as the local Kafka Controller and Metadata Cache has generated a snapshot with an end offset greater than or equal to the new log start offset. In other words for followers and observers the local log start offset is the minimum of the leader's log start offset and the largest local snapshot.

Kafka allows the clients to delete records that are less than a given offset by using the DeleteRecords RPC . Those requests will be rejected for the __cluster_metadata topic with a POLICY_VIOLATION error.

When to Delete Snapshots

The broker will delete any snapshot with a end offset and epoch (SnapshotId) less than the LogStartOffset. That means that the question of when to delete a snapshot is the same as when to increase the LogStartOffset which is covered in the section “When To Increase the LogStartOffset”.

Interaction with the Kafka Controller and Metadata Cache

This section describe the interaction between the state machine and Kafka Raft.

From Kafka Raft to the Kafka Controller and Metadata Cache

  1. Notify when a snapshot is available because of FetchSnapshot.

From the Kafka Controller and Metadata Cache to Kafka Raft

  1. Notify when snapshot finished. This includes the end offset and epoch.

Loading Snapshots

There are two cases when the Kafka Controller or Metadata Cache needs to load a snapshot:

  1. When it is booting.

  2. When the follower and observer replicas finishes fetching a new snapshot from the leader.

After loading the snapshot, the Kafka Controller and Metadata Cache can apply operations in the replicated log with an offset greater than or equal the snapshot’s end offset.

Changes to Leader Election

The followers of the __cluster_metadata topic partition need to take the snapshots into account when granting votes and asking for votes (becoming a candidate). Conceptually, a follower implements the following algorithm:

  1. Follower sends a fetch request
  2. Leader replies with a snapshot epoch and end offset.
  3. Follower pauses fetch requests
  4. Follower fetches snapshot chunks from the leader into a temporary snapshot file (<EndOffset>-<Epoch>.checkpoint.part)
  5. Validate fetched snapshot
    1. All snapshot chunks are fetched
    2. Verify the CRC of the records in the snapshot
    3. Atomically move the temporary snapshot file (<EndOffset>-<Epoch>.checkpoint.part) to the permanent location (<EndOffset>-<Epoch>.checkpoint)
  6. Follower resumes fetch requests by
    1. Setting the LogStartOffset to the snapshot's end offset.
    2. Setting the LEO or FetchOffset in the fetch request to the snapshot's end offset.
    3. Setting the LastFetchedEpoch in the fetch request to the snapshot's epoch.

Assuming that the snapshot fetched in bullet 4 has an epoch E1 and a offset O1. The state of the follower between bullet 4. and 5. is as follow:

  1. Contains a snapshot with epoch E1 and end offset O1.
  2. Contains zero or more snapshots older/less than epoch E1 and end offset O1.
  3. Contains a replicated log with LEO older/less than epoch E1 and end offset O1.

In this case the follower needs to use the snapshot with epoch E1 and end offset O1, and not the replicated log when granting votes and requesting votes. In the general case, the latest epoch and end offset for a replica is the latest snapshot or replicated log.

Validation of Snapshot and Log

For each replica we have the following invariant:

  1. If the LogStartOffset is 0, there are zero or more snapshots where the end offset of the snapshot is between LogStartOffset and the High-Watermark.
  2. If the LogStartOffset is greater than 0, there are one or more snapshots where the end offset of the snapshot is between LogStartOffset and the High-Watermark.

If the latest snapshot has an epoch E1 and end offset O1 and is it newer than the LEO of the replicated log, then the replica must set the LogStartOffset and LEO to O1.

Public Interfaces

Topic configuration

  1. The __cluster_metadata topic will have a cleanup.policy value of snapshot. This configuration can only be read. Updates to this configuration will not be allowed.

  2. metadata.snapshot.min.changed_records.ratio - The minimum ratio of snapshot records that have to change before generating a snapshot. See section "When to Snapshot". The default is .5 (50%).

  3. metadata.log.max.record.bytes.between.snapshots - This is the minimum number of bytes in the replicated log between the latest snapshot and the high-watermark needed before generating a new snapshot. See section "When to Snapshot". The default is 20MB.
  4. metadata.start.offset.lag.time.max.ms - The maximum amount of time that leader will wait for an offset to get replicated to all of the live replicas before advancing the LogStartOffset. See section “When to Increase the LogStartOffset”. The default is 7 days.

Network Protocol

Fetch

Request Schema

This KIP doesn’t require any changes to the fetch request outside of what is proposed in KIP-595.

Response Schema

The fetch response proposed in KIP-595 will be extended such that it includes new optional fields for snapshot end offset and snapshot epoch.


Code Block
languagejs
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
Code Block
languagejs
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMsTopics", "type": "int32[]FetchableTopicResponse", "versions": "10+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
response topics.", "fields": [
      { "name": "ErrorCodeName", "type": "int16string", "versions": "70+", "ignorableentityType": false"topicName",
        "about": "The top level response error codetopic name." },
      { "name": "SessionIdPartitions", "type": "int32[]FetchablePartitionResponse", "versions": "7+", "default"versions": "0+",
 "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
topic partitions.", "fields": [
        { "name": "TopicsPartitionIndex", "type": "[]FetchableTopicResponseint32", "versions": "0+",
          "about": "The responsepartition topicsindex." },
 "fields": [
      { "name": "NameErrorCode", "type": "stringint16", "versions": "0+",
 "entityType": "topicName",
        "about": "The topic name error code, or 0 if there was no fetch error." },
        { "name": "PartitionsHighWatermark", "type": "[]FetchablePartitionResponseint64", "versions": "0+",
          "about": "The current topichigh water partitionsmark.", "fields": [ },
        { "name": "PartitionIndexLastStableOffset", "type": "int32int64", "versions": "04+",
          "aboutdefault": "The partition index." }-1", "ignorable": true,
        {  "nameabout": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error.The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        { "name": "HighWatermarkLogStartOffset", "type": "int64", "versions": "0+"5+", "default": "-1", "ignorable": true,
          "about": "The current highlog waterstart markoffset." },
        { "name": "LastStableOffsetDivergingEpoch", "type": "int64EpochEndOffset", "versions": "412+", "defaulttaggedVersions": "-112+", "ignorabletag": true0,
          "about": "The last stable offset (or LSO) of the partition. This is the lastIn case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that thesubsequent staterecords ofare allknown transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
to diverge",
          "fields": [
            { "name": "LogStartOffsetEpoch", "type": "int64int32", "versions": "512+", "default": "-1", "ignorable": true,
          "about": "The current log start offset.": "-1" },
            { "name": "DivergingEpochEndOffset", "type": "EpochEndOffsetint64", "versions": "12+", "taggedVersionsdefault": "12+-1", "tag": 0, }
        ]},
  "about": "In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge",
{ "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [
            { "name": "EpochLeaderId", "type": "int32", "versions": "120+",
      "default      "about": "The ID of the current leader or -1"  if the leader is unknown."},
            { "name": "EndOffsetLeaderEpoch", "type": "int64", "versions"int32", "versions": "0+",
            "about": "12+", "default": "-1" The latest known leader epoch"}
        ]},
         { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",// ---------- Start of new field ----------
         { "versionsname": "12+SnapshotId", "taggedVersionstype": "12+SnapshotId", "tag": 1, fields": [
          { "nameversions": "LeaderId12+", "typetaggedVersions": "int3212+", "versionstag": "0+"2,
            "about": "TheIn the IDcase of fetching the current leader or -1 if the leader is unknown."}an offset less than the LogStartOffset, this is the end offset and epoch that should be used in the FetchSnapshot request.",
          "fields": [
            { "name": "LeaderEpochEndOffset", "type": "int32int64", "versions": "0+"},
            { "name": "Epoch",   "type": "int32", "aboutversions": "The latest known leader epoch0+"}
        ]},
        // ---------- StartEnd of new field ----------
        { "name": "SnapshotIdAborted", "type": "SnapshotId[]AbortedTransaction",
          "versions": "124+", "taggedVersionsnullableVersions": "124+", "tagignorable": 2false,
          "about": "InThe the case of fetching an offset less than the LogStartOffset, this is the end offset and epoch that should be used in the FetchSnapshot request.",
        aborted transactions.",  "fields": [
            { "name": "EndOffsetProducerId", "type": "int64", "versions": "04+"}", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "EpochFirstOffset", "type": "int32int64", "versions": "04+"},
        ]},
    "about": "The first offset in  // ---------- End new field ----------the aborted transaction." }
        ]},
        { "name": "AbortedPreferredReadReplica", "type": "[]AbortedTransactionint32", "versions": "411+", "nullableVersions": "4+", "ignorable": falsetrue,
          "about": "The aborted transactions.",  "fields": [
   preferred read replica for the consumer to use on its next fetch request"},
        { "name": "ProducerIdRecords", "type": "int64bytes", "versions": "40+", "nullableVersions": "0+",
          "entityType": "producerId","about": "The record data." }
      ]}
      "about": "The producer id associated with the aborted transaction." },
          { "name": "FirstOffset", ]}
  ]
}

Handling Fetch Request

The leader’s handling of fetch request will be extended such that if FetchOffset is less than LogStartOffset then the leader will respond with a SnapshotId of the latest snapshot.

Handling Fetch Response

The replica's handling of fetch response will be extended such that if SnapshotId is set then the follower will pause fetching of the log, and start fetching the snapshot from the leader. The value use in SnapshotId will be used in the FetchSnapshot requests. Once the snapshot has been fetched completely, the local log will be truncated and fetch will resume.

Fetching Snapshots

Request Schema

Code Block
languagejs
{
  "apiKey": 59,
  "type": "int64request", "versions": "4+",
            "about"name": "The first offset in the aborted transaction." }FetchSnapshotRequest",
  "validVersions": "0",
     ]}"flexibleVersions": "0+",
  "fields": [
     { "name": "PreferredReadReplicaClusterId", "type": "int32string", "versions": "0+", "nullableVersions": "110+", "ignorabledefault": true"null",
 "taggedVersions": "0+", "tag": 0,
      "about": "The preferredclusterId readif replicaknown, forthis theis consumerused to usevalidate metadata onfetches itsprior nextto fetchbroker requestregistration" },
        { "name": "RecordsReplicaId", "type": "bytesint32", "versions": "0+", "nullableVersionsdefault": "0+-1",
 "entityType": "brokerId",
        "about": "The recordbroker data." }
      ]}ID of the follower" },
     ]}
  ]
}

Handling Fetch Request

The leader’s handling of fetch request will be extended such that if FetchOffset is less than LogStartOffset then the leader will respond with a SnapshotId of the latest snapshot.

Handling Fetch Response

The replica's handling of fetch response will be extended such that if SnapshotId is set then the follower will pause fetching of the log, and start fetching the snapshot from the leader. The value use in SnapshotId will be used in the FetchSnapshot requests. Once the snapshot has been fetched completely, the local log will be truncated and fetch will resume.

Fetching Snapshots

Request Schema

Code Block
languagejs
{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  { "name": "MaxBytes", "type": "int32", "versions": "0+",
      "about": "The maximum bytes to fetch from all of the snapshots." },
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "ReplicaIdName", "type": "int32string", "versions": "0+", "entityType": "topicName",
        "about": "The broker IDname of the follower.topic to fetch" },
      { "name": "MaxBytesPartitions", "type": "int32[]PartitionSnapshot", "versions": "0+",
        "about": "The maximumpartitions bytes to fetch from all of the snapshots." },
", "fields": [
        { "name": "TopicsPartition", "type": "[]TopicSnapshotint32", "versions": "0+",
          "about": "The topicspartition to fetch.", "fields": [
index" },
        { "name": "NameCurrentLeaderEpoch", "type": "stringint32", "versions": "0+",
 "entityType": "topicName",
        "about": "The current nameleader epoch of the topic to fetch. partition, -1 for unknown leader epoch" },
        { "name": "PartitionsSnapshotId", "type": "[]PartitionSnapshotSnapshotId", "versions": "0+",
          "about": "The partitionssnapshot endOffset and epoch to fetch.",
          "fields": [
          { "name": "IndexEndOffset", "type": "int32int64", "versions": "0+" },
          "about": "The partition index." },
        { "name": "SnapshotIdEpoch", "type": "SnapshotIdint32", "versions": "0+", }
          "fields": [
  ]},
        { "name": "EndOffsetPosition", "type": "int64", "versions": "0+",
            "about": "IfThe set,byte thisposition iswithin the end offset that should be used in the FetchSnapshot request"},snapshot to start fetching from" }
      ]}
    ]}
  ]
}

Response Schema

Code Block
languagejs
{
      {"apiKey": 59,
  "nametype": "Epochresponse",
  "typename": "int32FetchSnapshotResponse",
  "versionsvalidVersions": "0+",
            "about"flexibleVersions": "0+"If set,
 this is the epoch that should be used in the FetchSnapshot request"}
        ]}, "fields": [
        { "name": "PositionThrottleTimeMs", "type": "int64int32", "versions": "0+",
 "ignorable": true,
        "about": "The byte position within the snapshot to start fetching from." }
      ]}
    ]}
  ]
}

Response Schema

Code Block
languagejs
{
  "apiKey": #,
  "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "responseint16",
  "nameversions": "FetchSnapshotResponse0+",
  "validVersionsignorable": "0"false,
      "flexibleVersionsabout": "0+",
  "fields": [The top level response error code." },
    { "name": "ThrottleTimeMsTopics", "type": "int32[]TopicSnapshot", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    topics to fetch.", "fields": [
      { "name": "ErrorCodeName", "type": "int16string", "versions": "0+", "ignorableentityType": false"topicName",
        "about": "The name topof levelthe responsetopic errorto codefetch." },
      { "name": "TopicsPartitions", "type": "[]TopicSnapshotPartitionSnapshot", "versions": "0+",
        "about": "The topicspartitions to fetch.", "fields": [
        { "name": "NameIndex", "type": "stringint32", "versions": "0+",
 "entityType": "topicName",
        "about": "The name of the topic to fetchpartition index." },
        { "name": "PartitionsErrorCode", "type": "[]PartitionSnapshotint16", "versions": "0+",
          "about": "The partitions to fetch.", "fields": [ error code, or 0 if there was no fetch error." },
        { "name": "IndexSnapshotId", "type": "int32SnapshotId", "versions": "0++",
          "about": "The snapshot endOffset and epoch fetched",
          "aboutfields": "The[
 partition index." },
        { "name": "ErrorCodeEndOffset", "type": "int16int64", "versions": "0+" },
          { "aboutname": "The error code, or 0 if there was no fetch error." Epoch", "type": "int32", "versions": "0+" }
        ]},
           { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
          "versions": "0+", "taggedVersions": "0+", "tag": 0, "fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
        ]},
        { "name": "Size", "type": "int64", "versions": "0+",
          "about": "The total size of the snapshot." },
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The starting byte position within the snapshot included in the Bytes field." },
        { "name": "BytesUnalignedRecords", "type": "bytesrecords", "versions": "0+",
          "about": "Snapshot data. in records format which may not be aligned on an offset boundary" }
         ]}
    ]}
  ]
}

Request Handling

...

  1. Find the snapshot for SnapshotId for the topic Name and partition Index.

  2. Set the Size of each snapshot.
  3. Send the bytes in the snapshot from Position. If there are multiple partitions in the FetchSnapshot request, then the leader will evenly distribute the number of bytes sent across all of the partitions. The leader will not send more bytes in the response than ResponseMaxBytes, which is the minimum of MaxBytes in the request and the value configured in replica.fetch.response.max.bytes.

    1. Each topic partition is guaranteed to receive at least the average of ResponseMaxBytes if that snapshot has enough bytes remaining.
    2. If there are topic partitions with snapshots that have remaining bytes less than the average ResponseMaxBytes, then those bytes may be used to send snapshot bytes for other topic partitions.

...

Response Handling

  1. Copy the bytes in Bytes UnalignedRecords to the local snapshot file name <SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint.part starting at file position Position.
  2. If Size is greater than the size of the current snapshot file, then the follower is expected to send another FetchSnapshotRequest with a Position set to the response's Position plus the response's len(BytesUnalignedRecords).

  3. If Size is equal to the size of the current snapshot file, then fetching the snapshot has finished.

    1. Validate the snapshot file.
    2. Atomically move the file named <SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint.part to a file named <SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint.

    3. Notify the Kafka Controller or Metadata Cache that a new snapshot is available.

    4. The Kafka Controller and Metadata Cache will load this new snapshot and read records from the log after the snapshot's end offset.

...