Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Formatting changes and minor corrections

...

Kafka’s replicated log can grow unbounded. If Kafka doesn’t limit the size of these logs it can affect Kafka availability by either running out of disk or increasing the amount of time needed to load the log into Kafka in memory state. For example, both the Group Coordinator and Kafka Controller need to load the __consumer_offsets and __metadata logs respectively into memory.

Kafka uses the cleanup policies compact and delete to limit the size of the log. The delete cleanup policy deletes data for a prefix of the log. This strategy is viable when using the replicated log to reconstruct the state machine associated with the log. The compact cleanup policy incrementally removes key/value records that are not needed to reconstruct the associated state machine. The compact policies has some issues that are address in this KIP.

...

Kafka implements a incremental log cleaner for topics configured for compaction. It keeps track of an OffsetMap which is a mapping from key to the latest offset in the section of the log being cleaned. The log cleaner assumes that any record with an offset less that the value in the OffsetMap given by the record's key is safe to remove. This solution will eventually result in one record for every key in log.

...

For example, how do you compact the operations like key <- previous value + 1 without knowing the value of the key at this offset?

Tombstone and Delete Operations

For compacted topics, Kafka allows for the deletion of keys in the state machine by appending a record with a key and with a value of null (e.g. key ← null). These tombstone records (X) must be kept in the log as long as there is another replica that contains a record (Y) for the same key key and that replica doesn’t contain record X. In other words, Y.offset < Follower.logEndOffset < X.offset.

Frequency of Compaction

By moving snapshots to the state machine already stored in-memory in the Group Coordinator and Kafka Controller we can generate snapshot faster and more frequently.

...

This improvement will be used for internal topics (__consumer_offset, __cluster_metadata and __transaction) but it should be general enough to eventually use on external topics with consumers that support this feature.

Proposed Changes

Topic partitions with snapshot for the cleanup.policy will have the log compaction delegated to the state machine. The snapshot algorithm explain in this document assumes that the state machine is in memory. The state machine for both the Kafka Controller and Group Coordinator are already in memory. Supporting state machine that don’t fit in memory are outside the scope of this document. See the Future Work section for details.

...

In the example above, if the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the log begin offset (a, x) then the leader handles the request as describe in KIP-595. Otherwise, the leader will respond with the offset and epoch of the latest snapshot (y, c) and with the next fetch offset and epoch (y + 1, d).

The followers and observers can concurrently fetch both the snapshot and the replicated log. During leader election, followers with incomplete or missing snapshot will send a vote request and response as if they had an empty log.

Snapshot Format

Need to investigate this further but we should be able to use the existing Log functionality in Kafka. We want to make sure that Kafka can quickly find the Log based on the topic name, topic partition id, SnapshotOffset and SnapshotOffsetEpoch. The last record in this log should include snapshot information like SnapshotOffset, SnapshotOffsetEpoch, and ChecksumThe state machine is responsible for the format of the snapshot. The snapshot will be stored in the topic partition directory and will have a name of <SnapshotOffset>-<SnapshotOffsetEpoch>.checkpoint. For example for the topic __cluster_metadata, partition 0, snapshot offset 5120793 and snapshot epoch 2, the full filename will be __cluster_metadata-0/00000000000005120793-2.checkpoint.

TODO: Add more details on the format. Here are some requirements

...

The topic partition will also include snapshot.minimum.records as a configuration.

If the update counter is greater than half the cardinality and the snapshot.minimum.records configuration then trigger a snapshot of the current state.

...

To minimize the need to send snapshots to slow replicas, the leader can delay increasing the log begin offset up to the minimum offset that it knows it has been replicate to all of the live replicas. Followers and observers will increase their log begin offset the value sent on the fetch response as long as the local state machine has generated a snapshot that includes to the log begin offset minus one.

LBO can be increase/set to an offset X if the following is true:

  1. There is a snapshot for the topic partition which includes the offset for LBO - 1 and.

  2. One of the following is true:

    1. All of the replicas (followers and observers) have replicated LBO or

    2. LBO is max.replication.lag.ms old.

Kafka allows the clients (DeleteRecords RPCs) to delete records that are less than a given offset. Those request need to get validated using the same logic enumerated above.

...

The broker will delete any snapshot with a latest offset and epoch (SnapshotOffsetAndEpoch) less than the LBO - 1 (log begin offset). The means that the question of when to delete and snapshot is the same as when to increase the LBO which is covered the the section “When To Increase the Log Begin Offset”.

...

This section describe the interaction between the state machine and Kafka Raft.

From Kafka Raft to the State Machine

...

  1. Notify when it expects a snapshot . The Raft log is interested in limiting the size of the log. This notification should include up to which offset it is safe to snapshot.Notify when a snapshot if available because of a FetchSnapshot.

From State Machine to Kafka Raft

...

The followers and observers of a topic partition will concurrently fetch the snapshot and replicated log. This means that candidates with incomplete snapshots will send a vote request with a LastOffsetEpoch of -1 and a LastOffset of -1.

Quorum Reassignment

TODO: Need to investigate if there are any implications to quorum reassignment and as some of the operations in the snapshot are for reassignment.

...

There is an invariant that every log must have at least one snapshot where the offset of the snapshot is between LogBeginOffset - 1and HighWatermark High-Watermark. If this is not the case then the replica should assume that the log is empty. Once the replica discovers the leader for the partition it can bootstrap itself by using the Fetch and FetchSnapshot RPCs as described in this document.

...

  1. Kafka topics will have an additional cleanup.policy value of snapshot. Once the configuration option is set to snapshot it cannot be changed. The __cluster_metadata topic will always have a cleanup.policy of snapshot. Setting the cleanup.policy to snapshot will enable the improvements document here.

  2. snapshot.minimum.records - The minimum number of records committed after the latest snapshot needed before performing another snapshot.

  3. max.replication.lag.ms - The maximum amount of time that leader will wait for an offset to get replicated to all of the replicas before advancing the LBO. See section “When to Increase the Log Begin Offset”.

...

  1. It includes new optional fields for snapshot offset and snapshot epoch.

  2. The LogStartOffset field is renamed to LogBeginOffset.

Code Block
languagejs
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partiiton index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "HighWatermark", "type": "int64", "versions": "0+",
          "about": "The current high water mark." },
        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        // Field renamed from LogStartOffset to LogBeginOffset 
        { "name": "LogBeginOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The current log begin offset." },
        // ---------- Start new fields by KIP-595 ----------
        { "name": "NextOffsetAndEpoch", "type": "OffsetAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [
          { "name": "NextFetchOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the offset that the follower should truncate to"},
          { "name": "NextFetchOffsetEpoch", "type": "int32", "versions": "0+",
            "about": "The epoch of the next offset in case the follower needs to truncate"}
        ]},
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
        ]},
        // ---------- End new fields by KIP-595 ----------
        // ---------- Start new fields ----------
        { "name": "SnapshotOffsetAndEpoch", "type": "OffsetAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [
          { "name": "FetchOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the offset that the follower should use for the FetchSnapshot request"},
          { "name": "FetchOffsetEpoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        // ---------- End new fields ----------
        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
          "about": "The aborted transactions.",  "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "FirstOffset", "type": "int64", "versions": "4+",
            "about": "The first offset in the aborted transaction." }
        ]},
        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true,
          "about": "The preferred read replica for the consumer to use on its next fetch request"},
        { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
          "about": "The record data." }
      ]}
    ]}
  ]
}

...

The replica’s handling of fetch request will be extended such that if FetchOffset is less than LogBeginOffset then the leader will respond with a SnapshotOffsetAndEpoch of the latest snapshot and a NextOffsetAndEpoch of the offset right after SnapshotOffsetAndEpoch.

Handling Fetch Response

The replica's handling of fetch response will be extended such that if SnapshotOffsetAndEpoch is set then the follower will truncate its entire log and start fetching from both the log and the snapshot. Where to fetch in the Log is describe by NextOffsetAndEpoch while which snapshot to fetch is described by SnapshotOffsetAndEpoch.

Fetching Snapshots

Request Schema

...

Code Block
languagejs
{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "Topics", "type": "[]TopicSnapshots", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshots", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "MediaType", "type": "string", "version": "0+",
          "about": "The media type and version of the snapshot. This is only set when the position is 0."}
	    { "name": "Continue", "type": "bool": "versions": "0+",
	      "about": "True if there is data remaining for fetch." },
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot." },
	    { "name": "Bytes", "type": "bytes": "versions": "0+",
	      "about": "Snapshot data." },
      ]}
    ]}
  ]
}

Request Handling

When the leader receives a FetchSnapshot request it would do the following for each partition:

  1. Find the snapshot for SnapshotOffsetAndEpoch for the topic Name and partition PartitionIndex.

  2. Send the bytes in the snapshot from Offset Position up to MaxBytes.

  3. Set Continue to true if there are renaming bytes in the snapshots.

Errors:

  1. SNAPSHOT_NOT_FOUND - when the fetch snapshot request specifies a SnapshotOffsetAndEpoch that doesn’t exists on the leader.

  2. OFFSET_OUT_OF_RANGE - when the fetch snapshot request’s offset is greater than the size of the snapshot.

  3. NOT_LEADER_FOR_PARTITION - when the fetch snapshot request is sent to a replica that is not the leader.

...

For each topic partition:

  1. If Continue is true, the follower is expected to send another FetchSnapshotRequest with an Offset Position of Offset Position + len(RecordsBytes) from the response.

  2. Copy the bytes in Records Bytes to the local snapshot file starting at offset Offsetfile position Position.

  3. If Continue is set to false then check that fetching the checksum of the snapshot on disk matches the Checksum sent by the leader. If the checksum do not match then re-fetch the snapshot from Offset.Notify the state machine (e.g. Kafka Controller) snapshot finised.Notify the Kafka Controller that a new snapshot is available. The state machine Kafka Controller will load this new snapshot and read offsets records from the log after the snapshot offset.

Errors:

  1. SNAPSHOT_NOT_FOUND, OFFSET_OUT_OF_RANGE, NOT_LEADER_FOR_PARTITION - The follower can rediscover the new snapshot by send a fetch request for Offset and Epoch with an offset and epoch of zero.

Metrics

TODO: figure out some useful metrics. I probably wont have until we have a fairly complete implementation.

...