Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP focuses on the changes required for use by the Kafka Controller's __cluster_metadata topic partition but we should be able to extend it to use it in other internal topic partitions like __consumer_offset and __transaction.

Proposed Changes

Topic partitions with snapshot for The __cluster_metadata topic will have snapshot as the cleanup.policy will have the log compaction delegated to the state machine. The snapshot algorithm explain in this document assumes that the state machine is in memory. The state machine for both the Kafka Controller and Group Coordinator are already in memory. Supporting state machine that don’t fit in memory are outside the scope of this document. See the Future Work section for details.The state machine represents the in memory state Kafka Controller will have an in-memory representation of the replicated log up to at most the high-watermark. When performing a snapshot the state machine Kafka Controller will serialize it’s this in-memory state to disk and include disk. This snapshot file on disk is described by the largest offset and epoch included in the snapshotfrom the replicated log that has been included.

The state machine Kafka Controller will notify the Kafka log Raft client when it generates has finished generating a new snapshots. After a snapshot has been persistent it It is safe for the log to truncate a prefix of the log up to the latest snapshot. For one The __cluster_metadata topic partition , Kafka will have the latest snapshot and zero or more older snapshots. These additional snapshots would have to be garbage collects. Having multiple snapshots is useful for minimizing re-fetching of the snapshot and will be described later in the document. These additional snapshots would have to be deleted, a process describe later in the document. 

Design

Visually a Kafka Raft Log would look as follow:

Code Block
Kafka Replicated Log:

      LBO  --               high-watermark --      LEO --
            V                               V           V
        ---------------------------------------------------
offset: | x + 1 | ... | y | y + 1 |  ...  |   |  ...  |   |
epoch:  | b     | ... | c | d     |  ...  |   |  ...  |   |
        ---------------------------------------------------

Kafka Snapshot Files:

<topic_name>-<partition_index>/x-a.checkpoint
<topic_name>-<partition_index>/y-c.checkpoint

Legends

Note: The extension checkpoint will be used since Kafka already has file with the snapshot extension.

Legends

  1. LEO - LEO - log end offset - the largest offset and epoch that has been written to disk.

  2. high-watermark - the largest offset and epoch that has been replicated to N/2 + 1 replicas.

  3. LBO - log begin offset - the smallest offset in the replicated log. This property used to be called log start offset. It is renamed such that the acronym doesn’t conflict with last stable offset (LSO).

Each Kafka topic partition will have multiple snapshots. Each snapshot contains the last offset and epoch for which this snapshot is consistent, the checksum of the snapshot and the sequence of records/operations representing the state machine.

In the example above, if the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the log begin offset (x, a, x) then the leader handles the request as describe in KIP-595. Otherwise, the leader will respond with the offset and epoch of the latest snapshot (y, c) and with the next fetch offset and epoch (y + 1, d).

The followers and observers can concurrently fetch both the snapshot and the replicated log. During leader election, followers with incomplete or missing snapshot will send a vote request and response as if they had an empty log.

Snapshot Format

The state machine Kafka Controller is responsible for the format of the snapshot. The snapshot will be stored in the topic partition directory and will have a name of <SnapshotOffset>-<SnapshotOffsetEpoch>.checkpoint. For example for the topic __cluster_metadata, partition 0, snapshot offset 5120793 and snapshot epoch 2, the full filename will be __cluster_metadata-0/00000000000005120793-2.checkpoint.

TODO: Add more details on the format. Here are some requirements

  1. Persist multiple formats. Kafka Controller may have a different snapshot format from Group Coordinator.

  2. Persist information about the latest offset and epoch included. This could be in the file name such that it doesn’t interfere with the file format.

  3. Persist information about the format type and version such that the state machine can decode it.

When to Snapshot

The state machine will compute the following metrics:

  1. cardinality - the number of keys in the state machine.

  2. update counter - the number of updates since the last snapshot.

The topic partition will also include snapshot.minimum.records as a configuration.

When to Snapshot

If the Kafka Controller generates a snapshot too frequently then can negatively affect the performance of the disk. If it doesn't generate a snapshot often enough then it can increase the amount of time it takes to load its state into memory and it can increase the amount space taken up by the replicated log. The Kafka Controller will have a new configuration option controller.snapshot.minimum.records.  If the number of records between the latest snapshot and the high-watermark is greater than controller.snapshot.minimum.records, then the Kafka Controller will perform a new snapshotIf the update counter is greater than half the cardinality and the snapshot.minimum.records configuration then trigger a snapshot of the current state.

When to Increase the Log Begin Offset

To minimize the need to send snapshots to slow replicas, the leader can delay increasing the log begin offset up to the minimum offset that it knows it has been replicate to all of the live replicas. Followers and observers will increase their log begin offset to the value sent on the fetch response as long as the local state machine has generated a snapshot that includes to the log begin offset minus one.

...

  1. There is a snapshot for the topic partition which includes the offset for LBO X - 1 and.

  2. One of the following is true:

    1. All of the live replicas (followers and observers) have replicated LBO or

    2. LBO is max.replication.lag.ms old.

Kafka allows the clients (DeleteRecords RPCs) to delete records that are less than a given offset by using the DeleteRecords RPC . Those request need to get requests will be validated using the same logic enumerated above.

...

The broker will delete any snapshot with a latest offset and epoch (SnapshotOffsetAndEpoch) less than the LBO - 1 (log begin offset). The means that the question of when to delete and a snapshot is the same as when to increase the LBO which is covered the the section “When To Increase the Log Begin Offset”.

Interaction Between Kafka Raft and

...

the Kafka Controller

This section describe the interaction between the state machine and Kafka Raft.

From Kafka Raft to the

...

Kafka Controller

  1. Notify when a snapshot is available because of a FetchSnapshot.

From

...

the Kafka Controller to Kafka Raft

  1. Notify when snapshot finished. This includes the largest included offset and epoch.

...

There are two cases when the state machine Kafka Controller needs to load the snapshot:

  1. When the state machine is first starting it will load the latest snapshotbroker is booting.

  2. When the replica follower and observer replicas finishes fetches a new snapshot from the leader it will notify the state machine of the new snapshot. The state machine will clear its in memory state and load the snapshot.

After loading the snapshot, the state machine Kafka Controller will apply operations in the replicated log with an offset greater than the snapshot’s offset and less than the high-watermark.

To avoid having a replica perform 1. followed by 2. the state machine should delay applying

...

Changes to Leader Election

The followers and observers of a of the __cluster_metadata topic partition will concurrently fetch the snapshot and replicated log. This means that candidates with incomplete snapshots will send a vote request with a LastOffsetEpoch of -1 and a LastOffset of -1.

Quorum Reassignment

TODO: Need to investigate if there are any implications to quorum reassignment and as some of the operations in the snapshot are for reassignment and a LastOffset of -1 no matter the LEO of the replicated log.

Validation of Snapshot and Log

...

Public Interfaces

Topic configuration

  1. Kafka topics will have an additional cleanup.policy value of snapshot. Once the configuration option is set to snapshot it cannot be changed. The __cluster_metadata topic will always have a an cleanup.policy value of snapshot. Setting the cleanup.policy to snapshot will enable the improvements document here.This configuration can only be read. Updates to this configuration will not be allowed.

  2. controller.snapshot.minimum.records - The is the minimum number of records committed after the latest snapshot needed before performing the Kafka Controller will perform another snapshot. See section "When to Snapshot"

  3. max.replication.lag.ms - The maximum amount of time that leader will wait for an offset to get replicated to all of the live replicas before advancing the LBO. See section “When to Increase the Log Begin Offset”.

Network Protocol

Fetch

Request Schema

This improvement KIP doesn’t require any changes to the fetch request outside of what is proposed in KIP-595.

...