...
This KIP focuses on the changes required for use by the Kafka Controller's __cluster_metadata topic partition but we should be able to extend it to use it in other internal topic partitions like __consumer_offset
and __transaction
.
Proposed Changes
Topic partitions with snapshot
for The __cluster_metadata
topic will have snapshot
as the cleanup.policy
will have the log compaction delegated to the state machine. The snapshot algorithm explain in this document assumes that the state machine is in memory. The state machine for both the Kafka Controller and Group Coordinator are already in memory. Supporting state machine that don’t fit in memory are outside the scope of this document. See the Future Work section for details.The state machine represents the in memory state Kafka Controller will have an in-memory representation of the replicated log up to at most the high-watermark. When performing a snapshot the state machine Kafka Controller will serialize it’s this in-memory state to disk and include disk. This snapshot file on disk is described by the largest offset and epoch included in the snapshotfrom the replicated log that has been included.
The state machine Kafka Controller will notify the Kafka log Raft client when it generates has finished generating a new snapshots. After a snapshot has been persistent it It is safe for the log to truncate a prefix of the log up to the latest snapshot. For one The __cluster_metadata
topic partition , Kafka will have the latest snapshot and zero or more older snapshots. These additional snapshots would have to be garbage collects. Having multiple snapshots is useful for minimizing re-fetching of the snapshot and will be described later in the document. These additional snapshots would have to be deleted, a process describe later in the document.
Design
Visually a Kafka Raft Log would look as follow:
Code Block |
---|
Kafka Replicated Log: LBO -- high-watermark -- LEO -- V V V --------------------------------------------------- offset: | x + 1 | ... | y | y + 1 | ... | | ... | | epoch: | b | ... | c | d | ... | | ... | | --------------------------------------------------- Kafka Snapshot Files: <topic_name>-<partition_index>/x-a.checkpoint <topic_name>-<partition_index>/y-c.checkpoint |
Legends
Note: The extension checkpoint
will be used since Kafka already has file with the snapshot
extension.
Legends
LEO - LEO - log end offset - the largest offset and epoch that has been written to disk.
high-watermark - the largest offset and epoch that has been replicated to N/2 + 1 replicas.
LBO - log begin offset - the smallest offset in the replicated log. This property used to be called log start offset. It is renamed such that the acronym doesn’t conflict with last stable offset (LSO).
Each Kafka topic partition will have multiple snapshots. Each snapshot contains the last offset and epoch for which this snapshot is consistent, the checksum of the snapshot and the sequence of records/operations representing the state machine.
In the example above, if the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the log begin offset (x, a, x)
then the leader handles the request as describe in KIP-595. Otherwise, the leader will respond with the offset and epoch of the latest snapshot (y, c)
and with the next fetch offset and epoch (y + 1, d)
.
The followers and observers can concurrently fetch both the snapshot and the replicated log. During leader election, followers with incomplete or missing snapshot will send a vote request and response as if they had an empty log.
Snapshot Format
The state machine Kafka Controller is responsible for the format of the snapshot. The snapshot will be stored in the topic partition directory and will have a name of <SnapshotOffset>-<SnapshotOffsetEpoch>.checkpoint
. For example for the topic __cluster_metadata, partition 0, snapshot offset 5120793 and snapshot epoch 2, the full filename will be __cluster_metadata-0/00000000000005120793-2.checkpoint
.
TODO: Add more details on the format. Here are some requirements
Persist multiple formats. Kafka Controller may have a different snapshot format from Group Coordinator.
Persist information about the latest offset and epoch included. This could be in the file name such that it doesn’t interfere with the file format.
Persist information about the format type and version such that the state machine can decode it.
When to Snapshot
The state machine will compute the following metrics:
cardinality - the number of keys in the state machine.
update counter - the number of updates since the last snapshot.
The topic partition will also include snapshot.minimum.records
as a configuration.
When to Snapshot
If the Kafka Controller generates a snapshot too frequently then can negatively affect the performance of the disk. If it doesn't generate a snapshot often enough then it can increase the amount of time it takes to load its state into memory and it can increase the amount space taken up by the replicated log. The Kafka Controller will have a new configuration option controller.snapshot.minimum.records
. If the number of records between the latest snapshot and the high-watermark is greater than controller.snapshot.minimum.records,
then the Kafka Controller will perform a new snapshotIf the update counter is greater than half the cardinality and the snapshot.minimum.records
configuration then trigger a snapshot of the current state.
When to Increase the Log Begin Offset
To minimize the need to send snapshots to slow replicas, the leader can delay increasing the log begin offset up to the minimum offset that it knows it has been replicate to all of the live replicas. Followers and observers will increase their log begin offset to the value sent on the fetch response as long as the local state machine has generated a snapshot that includes to the log begin offset minus one.
...
There is a snapshot for the topic partition which includes the offset for LBO X
- 1
and.One of the following is true:
All of the live replicas (followers and observers) have replicated
LBO
orLBO
ismax.replication.lag.ms
old.
Kafka allows the clients (DeleteRecords
RPCs) to delete records that are less than a given offset by using the DeleteRecords
RPC . Those request need to get requests will be validated using the same logic enumerated above.
...
The broker will delete any snapshot with a latest offset and epoch (SnapshotOffsetAndEpoch
) less than the LBO - 1
(log begin offset). The means that the question of when to delete and a snapshot is the same as when to increase the LBO
which is covered the the section “When To Increase the Log Begin Offset”.
Interaction Between Kafka Raft and
...
the Kafka Controller
This section describe the interaction between the state machine and Kafka Raft.
From Kafka Raft to the
...
Kafka Controller
- Notify when a snapshot is available because of a
FetchSnapshot
.
From
...
the Kafka Controller to Kafka Raft
Notify when snapshot finished. This includes the largest included offset and epoch.
...
There are two cases when the state machine Kafka Controller needs to load the snapshot:
When the state machine is first starting it will load the latest snapshotbroker is booting.
When the replica follower and observer replicas finishes fetches a new snapshot from the leader it will notify the state machine of the new snapshot. The state machine will clear its in memory state and load the snapshot.
After loading the snapshot, the state machine Kafka Controller will apply operations in the replicated log with an offset greater than the snapshot’s offset and less than the high-watermark.
To avoid having a replica perform 1. followed by 2. the state machine should delay applying
...
Changes to Leader Election
The followers and observers of a of the __cluster_metadata
topic partition will concurrently fetch the snapshot and replicated log. This means that candidates with incomplete snapshots will send a vote request with a LastOffsetEpoch
of -1
and a LastOffset
of -1
.
Quorum Reassignment
TODO: Need to investigate if there are any implications to quorum reassignment and as some of the operations in the snapshot are for reassignment and a LastOffset
of -1
no matter the LEO of the replicated log.
Validation of Snapshot and Log
...
Public Interfaces
Topic configuration
Kafka topics will have an additional
cleanup.policy
value ofsnapshot
. Once the configuration option is set tosnapshot
it cannot be changed. The__cluster_metadata
topic will always have a ancleanup.policy
value ofsnapshot
. Setting thecleanup.policy
tosnapshot
will enable the improvements document here.This configuration can only be read. Updates to this configuration will not be allowed.controller.snapshot.minimum.records
- The is the minimum number of records committed after the latest snapshot needed before performing the Kafka Controller will perform another snapshot. See section "When to Snapshot"max.replication.lag.ms
- The maximum amount of time that leader will wait for an offset to get replicated to all of the live replicas before advancing theLBO
. See section “When to Increase the Log Begin Offset”.
Network Protocol
Fetch
Request Schema
This improvement KIP doesn’t require any changes to the fetch request outside of what is proposed in KIP-595.
...