Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
Kafka Replicated Log:

  logStartOffset  LBO  --             high-watermark --           LEO --
                 V                             V                V
               -----------------------------------------------
       offset: | x | ... | y - 1 | y |  ...  |   |  ...  |   |
       epoch:  | b | ... | c     | d |  ...  |   |  ...  |   |
               -----------------------------------------------

Kafka Snapshot Files:

<topic_name>-<partition_index>/checkpoints/x-a.checkpoint
<topic_name>-<partition_index>/checkpoints/y-c.checkpoint

...

  1. LEO - log end offset - the next offset to be written to disk.

  2. high-watermark - the largest offset and epoch that has been replicated to N/2 + 1 replicas.

  3. LBO LogStartOffset - log begin start offset - the smallest offset in the replicated log. This property used to be called log start offset. It is renamed such that the acronym doesn’t conflict with last stable offset (LSO).

In the example above, offset=x - 1, epoch=a does not appear in the replicated log because it is before the LBO logStartOffset (x, b). If the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the LBO logStartOffset (x, b) then the leader handles the request as describe in KIP-595. Otherwise, the leader will respond with the epoch and end offset of the latest snapshot (y, c).

...

To minimize the need to send snapshots to slow replicas, the leader can delay increasing the log begin offset up to the minimum offset that it knows it has been replicate to all of the live replicas. The leader's LBO LogStartOffset can be increased to an offset X if the following is true:

  1. There is a snapshot for the topic partition which includes the offset for X - 1 and.

  2. One of the following is true:

    1. All of the live replicas (followers and observers) have replicated LBO or

    2. LBO LogStartOffset is metadata.lbostart.offset.lag.time.max.ms milliseconds old.

...

The broker will delete any snapshot with a end offset and epoch (SnapshotId) less than the LBO LogStartOffset. That means that the question of when to delete a snapshot is the same as when to increase the LBO LogStartOffset which is covered in the section “When To Increase the Log Begin Offset”.

...

  1. Follower sends a fetch request
  2. Leader replies with a snapshot epoch and end offset.
  3. Follower pauses fetch requests
  4. Follower fetches a snapshot from the leader
  5. Follower resumes fetch requests by
    1. Setting the LBO LogStartOffset to the snapshot's end offset.
    2. Setting the LEO or FetchOffset in the fetch request to the snapshot's end offset.
    3. Setting the LastFetchedEpoch in the fetch request to the snapshot's epoch.

...

For each replica we have the following invariant:

  1. If the LBO LogStartOffset is 0, there are zero or more snapshots where the end offset of the snapshot is between LBO LogStartOffset and the High-Watermark.
  2. If the LBO LogStartOffset is greater than 0, there are one or more snapshots where the end offset of the snapshot is between LBO LogStartOffset and the High-Watermark.

If the latest snapshot has an epoch E1 and end offset O1 and is it newer than the LEO of the replicated log, then the replica must set the LBO LogStartOffset and LEO to O1.

Public Interfaces

...

  1. The __cluster_metadata topic will have a cleanup.policy value of snapshot. This configuration can only be read. Updates to this configuration will not be allowed.

  2. metadata.snapshot.min.cleanable.ratio - The minimum ratio of snapshot records that have to change before generating a snapshot. See section "When to Snapshot". The default is .5 (50%).

  3. metadata.lbostart.offset.lag.time.max.ms - The maximum amount of time that leader will wait for an offset to get replicated to all of the live replicas before advancing the LBO. See section “When to Increase the Log Begin Offset”LogStartOffset”. The default is 7 days.

Network Protocol

...

Code Block
languagejs
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "HighWatermark", "type": "int64", "versions": "0+",
          "about": "The current high water mark." },
        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        // ---------- Start of renamed field ----------
        { "name": "LogBeginOffsetLogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The current log start offset." },
        // ---------- End of renamed field ----------
        { "name": "DivergingEpoch", "type": "EpochEndOffset", "versions": "12+", "taggedVersions": "12+", "tag": 0,
          "about": "In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge",
          "fields": [
            { "name": "Epoch", "type": "int32", "versions": "12+", "default": "-1" },
            { "name": "EndOffset", "type": "int64", "versions": "12+", "default": "-1" }
        ]},
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
        ]},
        // ---------- Start of new field ----------
        { "name": "SnapshotId", "type": "SnapshotId",
          "versions": "12+", "taggedVersions": "12+", "tag": 2, "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the end offset that the follower should use for the FetchSnapshot request"},
          { "name": "Epoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        // ---------- End new field ----------
        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
          "about": "The aborted transactions.",  "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "FirstOffset", "type": "int64", "versions": "4+",
            "about": "The first offset in the aborted transaction." }
        ]},
        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true,
          "about": "The preferred read replica for the consumer to use on its next fetch request"},
        { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
          "about": "The record data." }
      ]}
    ]}
  ]
}

...