Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP assumes that KIP-595 has been approved and implemented. With this improvement replicas will continue to replicate their log among each others. This means that voters (leader and followers) and observers will have the replicated log on disk. Replicas will snapshot their log independent of each other. Snapshots will be consistent because the logs are consistent across replicas. Follower and observer replicas fetch the snapshots from the leader when they attempt to fetch an offset from the leader and the leader doesn’t have that offset in the log.

...

Code Block
Kafka Replicated Log:

logStartOffsetLogStartOffset  --             high-watermark --           LEO --
                 V                             V                V
               -----------------------------------------------
       offset: | x | ... | y - 1 | y |  ...  |   |  ...  |   |
       epoch:  | b | ... | c     | d |  ...  |   |  ...  |   |
               -----------------------------------------------

Kafka Snapshot Files:

<topic_name>-<partition_index>/checkpoints/x-a.checkpoint
<topic_name>-<partition_index>/checkpoints/y-c.checkpoint

...

In the example above, offset=x - 1, epoch=a does not appear in the replicated log because it is before the logStartOffset LogStartOffset (x, b). If the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the logStartOffset LogStartOffset (x, b) then the leader handles the request as describe in KIP-595. Otherwise, the leader will respond with the epoch and end offset of the latest snapshot (y, c).

...

If the Kafka Controller or Metadata Cache generates a snapshot too frequently then it can negatively affect the performance of the disk. If it doesn't generate a snapshot often enough then it can increase the amount of time it takes to load its state into memory and it can increase the amount space taken up by the replicated log. The Kafka Controller and Metadata Cache will have a new configuration option metadata.snapshot.min.cleanable.ratio.  If the number of snapshot records that have changed (deleted or modified) between the latest snapshot and the current in-memory state is greater than metadata.snapshot.min.cleanable.ratio, then the Kafka Controller and Metadata Cache will perform a new snapshot.

Note that new snapshot records don't count against this ratio. If a new snapshot record was added since that last snapshot then it doesn't affect the dirty ratio. If a snapshot record was added and then modify or deleted then it counts against the dirty ratio.

When to Increase the

...

LogStartOffset

To minimize the need to send snapshots to slow replicas, the leader can delay increasing the log begin start offset up to the minimum offset that it knows it has been replicate to all of the live replicas. The leader's LogStartOffset can be increased to an offset X if the following is true:

  1. There is a snapshot for the topic partition which includes the offset for X - 1 and.

  2. One of the following is true:

    1. All of the live replicas (followers and observers) have replicated LBO LogStartOffset or

    2. LogStartOffset is metadata.start.offset.lag.time.max.ms milliseconds old.

Followers and observers will increase their log begin start offset to the value sent on the fetch response as long as the local Kafka Controller and Metadata Cache has generated a snapshot with an end offset greater than or equal to the new log begin start offset.

Kafka allows the clients to delete records that are less than a given offset by using the DeleteRecords RPC . Those requests will be rejected for the __cluster_metadata.

...

The broker will delete any snapshot with a end offset and epoch (SnapshotId) less than the LogStartOffset. That means that the question of when to delete a snapshot is the same as when to increase the LogStartOffset which is covered in the section “When To Increase the Log Begin Offset”LogStartOffset”.

Interaction with the Kafka Controller and Metadata Cache

...

  1. The __cluster_metadata topic will have a cleanup.policy value of snapshot. This configuration can only be read. Updates to this configuration will not be allowed.

  2. metadata.snapshot.min.cleanable.ratio - The minimum ratio of snapshot records that have to change before generating a snapshot. See section "When to Snapshot". The default is .5 (50%).

  3. metadata.start.offset.lag.time.max.ms - The maximum amount of time that leader will wait for an offset to get replicated to all of the live replicas before advancing the LBO LogStartOffset. See section “When to Increase the LogStartOffset”. The default is 7 days.

...

  1. It includes new optional fields for snapshot end offset and snapshot epoch.The LogStartOffset field is renamed to LogBeginOffset.


Code Block
languagejs
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "HighWatermark", "type": "int64", "versions": "0+",
          "about": "The current high water mark." },
        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        // ---------- Start of renamed field ----------
        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The current log start offset." },
        // ---------- End of renamed field ---------- offset." },
        { "name": "DivergingEpoch", "type": "EpochEndOffset", "versions": "12+", "taggedVersions": "12+", "tag": 0,
          "about": "In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge",
          "fields": [
            { "name": "Epoch", "type": "int32", "versions": "12+", "default": "-1" },
            { "name": "EndOffset", "type": "int64", "versions": "12+", "default": "-1" }
        ]},
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
        ]},
        // ---------- Start of new field ----------
        { "name": "SnapshotId", "type": "SnapshotId",
          "versions": "12+", "taggedVersions": "12+", "tag": 2, "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the end offset that the follower should use for the FetchSnapshot request"},
          { "name": "Epoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        // ---------- End new field ----------
        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
          "about": "The aborted transactions.",  "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "FirstOffset", "type": "int64", "versions": "4+",
            "about": "The first offset in the aborted transaction." }
        ]},
        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true,
          "about": "The preferred read replica for the consumer to use on its next fetch request"},
        { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
          "about": "The record data." }
      ]}
    ]}
  ]
}

...

The leader’s handling of fetch request will be extended such that if FetchOffset is less than LogBeginOffset LogStartOffset then the leader will respond with a SnapshotId of the latest snapshot.

...

  1. SNAPSHOT_NOT_FOUND - when the fetch snapshot request specifies a SnapshotId that doesn’t exists on the leader.

  2. OFFSETPOSITION_OUT_OF_RANGE - when the fetch snapshot request specifies a position that is greater than the size of the snapshot.

  3. NOT_LEADER_FOR_PARTITION - when the fetch snapshot request is sent to a replica that is not the leader.

...

Full NameDescription
kafka.controller:type=KafkaController,name=GenSnapshotLatencyMsA histogram of the amount of time it took to generate a snapshot.
kafka.controller:type=KafkaController,name=LoadSnapshotLatencyMsA histogram of the amount of time it took to load the snapshot.
kafka.controller:type=KafkaController,name=SnapshotSizeBytesSize of the latest snapshot in bytes.
kafka.controller:type=KafkaController,name=SnapshotLagThe number of offsets between the largest snapshot offset and the high-watermark.

Compatibility, Deprecation, and Migration Plan

This KIP is only implemented for the internal topic __cluster_metadata. An increase of the inter broker protocol (IBP) is not required, since we are only implementing this for the __cluster_metadata topic partition, that partition is a new partition and will be handle by the KafkaRaftClient. Internal and external clients for all other topics can ignore the SnapshotId as that field will not be set for those topic partitions that are not __cluster_metadata.

Rejected Alternatives

Append the snapshot to the Kafka log: Instead of having a separate file for snapshots and separate RPC for downloading the snapshot, the leader could append the snapshot to the log. This design has a few drawbacks. Because the snapshot gets appended to the head of the log that means that it will be replicated to every replica in the partition. This will cause Kafka to use more network bandwidth than necessary.

...