Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. The __cluster_metadata topic will have a cleanup.policy value of snapshot. This configuration can only be read. Updates to this configuration will not be allowed.

  2. controller.snapshot.minimum.records is the minimum number of records committed after the latest snapshot before the Kafka Controller and Metadata Cache will perform another snapshot. See section "When to Snapshot". The default is 16384 records.

  3. max.replication.lag.ms - The maximum amount of time that leader will wait for an offset to get replicated to all of the live replicas before advancing the LBO. See section “When to Increase the Log Begin Offset”. The default is 7 days.

  4. replica.fetch.snapshot.max.bytes - The maximum amount of bytes sent per partition in the FetchSnapshotResponse. The default is 10MB.
  5. replica.fetch.snapshot.response.max.bytes - The maximum amount of bytes sent in the entire FetchSnapshotResponse. The default is 10MB.

Network Protocol

Fetch

Request Schema

...

Code Block
languagejs
{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "0+",
      "about": "The broker ID of the follower." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+",
      "about": "The maximum bytes to fetch from all of the snapshots." }
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
          "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the end offset that the follower should use for the FetchSnapshot request"},
          { "name": "Epoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot." },
        { "name": "MaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this snapshot." }
      ]}
    ]}
  ]
}

Response Schema

...

  1. Find the snapshot for SnapshotId for the topic Name and partition PartitionIndex.

  2. Set the Size of each snapshot.
  3. Send the bytes in the snapshot from Position up to at most MaxBytes. Set Continue to true if there are renaming bytes in the snapshotsIf there are multiple partitions in the FetchSnapshot request, then the leader will attempt to evenly distribute the number of bytes sent across all of the partitions. The leader will not send more bytes than the value configure in replica.fetch.response.max.bytes.

Errors:

  1. SNAPSHOT_NOT_FOUND - when the fetch snapshot request specifies a SnapshotId that doesn’t exists on the leader.

  2. OFFSET_OUT_OF_RANGE - when the fetch snapshot request specifies a position that is greater than the size of the snapshot.

  3. NOT_LEADER_FOR_PARTITION - when the fetch snapshot request is sent to a replica that is not the leader.

...