Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Terminology

  1. Kafka Controller: The component that generates snapshots, reads snapshots and reads logs for voter replicas of the topic partition __cluster_metadata.
  2. Active Controller: The Kafka Controller that is also a leader of the __cluster_metadata topic partition. This component is allowed to append to the log.
  3. Metadata Cache: The component that generates snapshots, reads snapshots and reads logs for observer replicas of the topic partition __cluster_metadata. This is needed to reply to Metadata RPCs, for connection information to all of the brokers, etc.
  4. SnapshotId: Each snapshot is uniquely identified by a SnapshotId, the epoch and end offset of the records in the replicated log included in the snapshot.

Motivation

In KIP-500 the Active Controller, which is the quorum leader from KIP-595, will materialize the entries in the metadata log into memory. Each broker in KIP-500, which will be a voter or observer replica of the metadata log, will materialize the entries in the log into a metadata cache. We want to provide stronger guarantee on the consistency of the metadata than is possible today. If all entries in the metadata log are indefinitely retained, then the design described in KIP-595 achieves a consistent log and as a result a consistent metadata cache. If Kafka doesn’t limit the size of this replicated log it can affect Kafka availability by:

...

Kafka uses the cleanup policies compact and delete to limit the size of the log. The delete cleanup policy deletes data for a prefix of the log based on time. The delete cleanup policy is not viable when used on metadata log because the Kafka controller Controller and metadata cache Metadata Cache will not be able to generate a consistent view from such a log.

...

The type of in-memory state machines that we plan to implement for the Kafka Controller and metadata cache (see KIP-631) doesn't map very well to a key and offset based clean up policy. The data model for the Kafka Controller and metadata cache requires us to supports deltas/events. Currently the controller uses ZK for both storing the current state of the cluster and the deltas that it needs to apply. One example of this is topic deletion, when the user asks for a topic to be deleted 1) the Kafka Controller (AdminManager in the code) persists and commits the delete event, 2) applies the event to its in-memory state and sends RPC to all of the affected brokers, 3) persists and commits the changes to ZK. We don't believe that a key-value and offset based compaction model can be for events/deltas in a way that it is intuitive to program against.

...

Because the Kafka Controller and Metadata Cache as described in KIP-631 will use events/deltas to represent some of its in-memory state changes the amount of data that needs to be written to the log and replicated to all of the brokers can be reduced. As an example if a broker goes offline and we assume that we have 100K topics each with 10 partitions with a replica factor of 3, 100 brokers. In a well balanced cluster we can expect that no broker will have 2 or more partitions for the same topic. In this case the best we can do for IsrChange message and FenceBroker message are the following message sizes:

...

When a broker is shutdown and the Kafka Active Controller uses IsrChange to include this information in the replicated log for the __cluster_metadata topic partition then the Kafka Controller Controllers and each broker needs to persist 40 bytes per topic partitions hosted by the broker. Each broker will be a replica for 30,000 partitions, that is 1.14 MB that needs to get written in the replicated log by each broker. The Active Controller will need to replicate 1.14 MB to each broker in the cluster (99 brokers) or 113.29 MB.

...

Generating and loading the snapshot will be delegated to the Kafka ControllerKafka Controllers and Metadata Cache. The Kafka Controller Controllers and Metadata Cache will notify the Kafka Raft layer when it has generated a snapshot and the end offset of  the snapshot. The Kafka Raft layer will notify the Kafka Controller or Metadata Cache when a new snapshot has been fetched from the Active Controller. See section "Interaction Between Kafka Raft and with the Kafka Controller or Metadata Cache" and KIP 595 for more details.

This KIP focuses on the changes required for use by the Kafka Controller's __cluster_metadata topic partition but we should be able to extend it to use it in other internal topic partitions like __consumer_offsets and __transaction.

Terminology

...

the __cluster_metadata topic partition

...

but we should be able to extend it to use it in other internal topic partitions like __consumer_offsets and __transaction.

Proposed Changes

The __cluster_metadata topic will have snapshot as the cleanup.policy. The Kafka Controller and Metadata Cache will have an in-memory representation of the replicated log up to at most the high-watermark. When performing a snapshot the Kafka Controller and Metadata Cache will serialize this in-memory state to disk. This snapshot file on disk is described by the end offset and epoch from the replicated log that has been included.

The Kafka Controller and Metadata Cache will notify the Kafka Raft client when it has finished generating a new snapshots. It is safe to truncate a prefix of the log up to the latest snapshot. The __cluster_metadata topic partition will have the latest snapshot and zero or more older snapshots. These additional snapshots would have to be deleted, this is described in "When to Delete Snapshots". 

Design

Visually a Kafka Raft Log topic partition would look as follow:

Code Block
Kafka Replicated Log:

    LBO  --             high-watermark --           LEO --
          V                             V                V
        -----------------------------------------------
offset: | x | ... | y - 1 | y |  ...  |   |  ...  |   |
epoch:  | b | ... | c     | d |  ...  |   |  ...  |   |
        -----------------------------------------------

Kafka Snapshot Files:

<topic_name>-<partition_index>/checkpoints/x-a.checkpoint
<topic_name>-<partition_index>/checkpoints/y-c.checkpoint

...

  1. LEO - log end offset - the largest offset and epoch that has been next offset to be written to disk.

  2. high-watermark - the largest offset and epoch that has been replicated to N/2 + 1 replicas.

  3. LBO - log begin offset - the smallest offset in the replicated log. This property used to be called log start offset. It is renamed such that the acronym doesn’t conflict with last stable offset (LSO).

In the example above, offset=x - 1, epoch=a does not appear in the replicated log because it is before the LBO (x, b). If the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the LBO (x, b) then the leader handles the request as describe in KIP-595. Otherwise, the leader will respond with the epoch and end offset of the latest snapshot (y, c).The followers and observers can concurrently fetch both the snapshot and the replicated log. During leader election, followers that violate the invariant in the "Validation of Snapshot and Log" section will send a vote request and response as if they had an empty log

Adding snapshots to KIP-595 has an effect on how voters grant votes and how candidate request votes. See section "Changes to Leader Election" for details.

Snapshot Format

The Kafka Controller and Metadata Cache are responsible for the content of the snapshot. Each snapshot is uniquely identified by a SnapshotId, the epoch and end offset of the records in the replicated log included in the snapshot. The snapshot will be stored in the checkpoints directory in the topic partition directory and will have a name of <SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint. For example for the topic __cluster_metadata, partition 0, snapshot end offset 5120793 and snapshot epoch 2, the full filename will be __cluster_metadata-0/checkpoints/00000000000005120793-00000000000000000002.checkpoint.

...

Using version 2 of the log format will allow the Kafka Controller Controllers and Metadata Cache to compress records and identify corrupted records in the snapshot. Even though snapshot use the log format for storing this state there is no requirement:

...

When to Snapshot

If the Kafka Controller or Metadata Cache generates a snapshot too frequently then it can negatively affect the performance of the disk. If it doesn't generate a snapshot often enough then it can increase the amount of time it takes to load its state into memory and it can increase the amount space taken up by the replicated log. The Kafka Controller and Metadata Cache will have a new configuration option controllermetadata.snapshot.min.cleanable.ratio.  If the number of snapshot records that have changed (deleted or modified) between the latest snapshot and the current in-memory state is greater than controllermetadata.snapshot.min.cleanable.ratio, then the Kafka Controller and Metadata Cache will perform a new snapshot.

...

  1. There is a snapshot for the topic partition which includes the offset for X - 1 and.

  2. One of the following is true:

    1. All of the live replicas (followers and observers) have replicated LBO or

    2. LBO is maxmetadata.replicationlbo.lag.time.max.ms milliseconds old.

Followers and observers will increase their log begin offset to the value sent on the fetch response as long as the local Kafka Controller and Metadata Cache has generated a snapshot with an end offset greater than or equal to the new log begin offset.

...

The broker will delete any snapshot with a end offset and epoch (SnapshotId) less than the LBO. That means that the question of when to delete a snapshot is the same as when to increase the LBO which is covered in the section “When To Increase the Log Begin Offset”.

Interaction

...

with the Kafka Controller and Metadata Cache

This section describe the interaction between the state machine and Kafka Raft.

From Kafka Raft to the Kafka Controller and Metadata Cache

  1. Notify when a snapshot is available because of a FetchSnapshot.

From the Kafka Controller and Metadata Cache to Kafka Raft

  1. Notify when snapshot finished. This includes the end offset and epoch.

...

There are two cases when the Kafka Controller or Metadata Cache needs to load the snapshot:

...

After loading the snapshot, the Kafka Controller and Metadata Cache can apply operations in the replicated log with an offset greater than or equal the snapshot’s end offset.

...

Assuming that the snapshot fetched in bullet 4 has an epoch E E1 and a offset O O1. The state of the follower between bullet 4. and 5. could be as follow:

  1. Contains a snapshot with epoch E E1 and end offset O O1.
  2. Contains many snapshots older/less than epoch E E1 and end offset O1.
  3. Contains a replicated log with LEO older/less than epoch E1 and end offset O1.

In this case the follower needs to use the snapshot with epoch E1 and end offset O1, and not the replicated log when granting votes and requesting votes. In the general case, the latest epoch and end offset for a replica is the latest snapshot or replicated log.

...

If the latest snapshot has an epoch E E1 and end offset O O1 and is it newer than the LEO of the replicated log, then the replica must set the LBO and LEO to O1.

Public Interfaces

Topic configuration

  1. The __cluster_metadata topic will have a cleanup.policy value of snapshot. This configuration can only be read. Updates to this configuration will not be allowed.

  2. controllermetadata.snapshot.min.cleanable.ratio - The minimum ratio of snapshot records that have to change before generating a snapshot. See section "When to Snapshot". The default is .5 (50%).

  3. controllermetadata.lbo.lag.time.max.ms - The maximum amount of time that leader will wait for an offset to get replicated to all of the live replicas before advancing the LBO. See section “When to Increase the Log Begin Offset”. The default is 7 days.

...

Code Block
languagejs
{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "HighWatermark", "type": "int64", "versions": "0+",
          "about": "The current high water mark." },
        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, offset have been decided (ABORTED or COMMITTED)" },
        // ---------- Start of renamed field ----------
        { "name": "LogBeginOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The current log start offset." },
        // ---------- End of renamed field ----------
        { "name": "DivergingEpoch", "type": "EpochEndOffset", "versions": "12+", "taggedVersions": "12+", "tag": 0,
          "about": "In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge",
          "fields": [
            { "name": "Epoch", "type": "int32", "versions": "12+", "default": "-1" },
            { "name": "EndOffset", "type": "int64", "versions": "12+", "default": "-1" }
        ]},
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
        ]},
        // ---------- Start of new field ----------
        { "name": "SnapshotId", "type": "SnapshotId",
          "versions": "12+", "taggedVersions": "12+", "tag": 2, "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the end offset that the follower should use for the FetchSnapshot request"},
          { "name": "Epoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        // ---------- End new field ----------
        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
          "about": "The aborted transactions.",  "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "FirstOffset", "type": "int64", "versions": "4+",
            "about": "The first offset in the aborted transaction." }
        ]},
        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true,
          "about": "The preferred read replica for the consumer to use on its next fetch request"},
        { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
          "about": "The record data." }
      ]}
    ]}
  ]
}

...

Code Block
languagejs
{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "0+",
      "about": "The broker ID of the follower." },
    { "name": "MaxBytes", "type": "int32", "versions": "0+",
      "about": "The maximum bytes to fetch from all of the snapshots." }
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "SnapshotId", "type": "SnapshotId", "versions": "0+",
          "fields": [
          { "name": "EndOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the end offset that the follower should use for the FetchSnapshot request"},
          { "name": "Epoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot to start fetching from." }
      ]}
    ]}
  ]
}

Response Schema

Code Block
languagejs
{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "Topics", "type": "[]TopicSnapshot", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "Index", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "Size", "type": "int64": "versions": "0+",
          "about": "The total size of the snapshot." },
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The starting byte position within the snapshot included in the Bytes field." },
        { "name": "Bytes", "type": "bytes": "versions": "0+",
          "about": "Snapshot data." },
      ]}
    ]}
  ]
}

...

  1. The Fetch RPC is typically used to download a never ending stream of records. Snapshot have a known size and the fetching of a snapshot ends when all of those bytes are received by the replica. The Fetch RPC includes a few fields for dealing with this access pattern that are not needed for fetching snapshots.
  2. This solution ties the snapshot format to Kafka's log record format. To lower the overhead of writing and loading the snapshot by the Kafka Controller and Metadata Cache, it is important to use a format that is compatible with the in-memory representation of the Kafka Controller and Metadata Cache.


Change leader when generating snapshots: Instead of supporting concurrent snapshot generation and writing to the in-memory state, we could require that only non-leaders generate snapshots. If the leader wants to generate a snapshot then it would relinquish leadership. We decided to not implement this because:

...