Status
Current state: [One of "Under Discussion", "Accepted", "Rejected"]
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In KIP-500 the Kafka Controller, which is the quorum leader from KIP-595, will materialize the entries in the metadata log into memory. Each broker in KIP-500, which will be a replica of the metadata log, will materialize the entries in the log into a metadata cache. We want to provide a stronger guarantee on the consistency of the metadata than is possible today. If all entries in the metadata log are indefinitely retained, then the design described in KIP-595 achieves a consistent log and as a result a consistent metadata cache. If Kafka doesn’t limit the size of these logs it can affect Kafka availability by:
- Running out of disk space
- Increasing the time needed to load the replicated log into memory
- Increasing the time it takes for new brokers to catch up to the leader's log
Kafka uses the cleanup policies compact
and delete
to limit the size of the log. The delete
cleanup policy deletes data for a prefix of the log based on time. The delete
cleanup policy is not viable when used on metadata log because the Kafka controller and metadata cache will not be able to generate a consistent view from the replicated log.
The compact
cleanup policy incrementally removes key/value records that are not needed to reconstruct the associated state. During compaction the log cleaner keeps track of an OffsetMap
which is a mapping from key
to the latest offset in the section of the log being cleaned. The log cleaner assumes that any record with an offset less that the value in the OffsetMap
given by the record's key is safe to remove. This solution will eventually result in one record for every key in log. This includes a tombstone record for key that have been deleted. Kafka removes this tombstone records after a configurable time. This can result in different logs in each replica which can result in different in-memory state after applying the log.
Type of State Machines and Operations
This solution assumes that operation for the state machine can be compacted without having to inspect the value of the state machine at that offset.
For example, how do you compact the operations like key <- previous value + 1
without knowing the value of the key
at this offset?
Tombstone and Delete Operations
For compacted topics, Kafka allows for the deletion of keys in the state machine by appending a record with a key and with a value of null
(e.g. key ← null
). These tombstone records (X
) must be kept in the log as long as there is another replica that contains a record (Y
) for the same key key and that replica doesn’t contain record X
. In other words, Y.offset < Follower.logEndOffset < X.offset
.
Frequency of Compaction
By moving snapshots to the state machine already stored in-memory in the Group Coordinator and Kafka Controller we can generate snapshot faster and more frequently.
Overview
This KIP assumes that KIP-595 has been approved and implemented. With this improvement replicas will continue to replicate their log among each others. Replicas will snapshot their log independent of each other. This is consistent because the log themselves are consistent. Snapshot will be sent to replicas that attempt to fetch an offset from the leader and the leader doesn’t have that offset in the log because it is included in the snapshot.
Generating and loading the snapshot will be delegated to the Kafka Controller. The Kafka Controller will notify the Kafka Raft client when it has generated a snapshot and up to which offset is included in the snapshot. The Kafka Raft client will notify the Kafka Controller when
This improvement will be used for internal topics (__consumer_offset
, __cluster_metadata
and __transaction
) but it should be general enough to eventually use on external topics with consumers that support this feature.
Proposed Changes
Topic partitions with snapshot
for the cleanup.policy
will have the log compaction delegated to the state machine. The snapshot algorithm explain in this document assumes that the state machine is in memory. The state machine for both the Kafka Controller and Group Coordinator are already in memory. Supporting state machine that don’t fit in memory are outside the scope of this document. See the Future Work section for details.
The state machine represents the in memory state of the replicated log up to at most the high-watermark. When performing a snapshot the state machine will serialize it’s state to disk and include the largest offset and epoch included in the snapshot.
The state machine will notify the Kafka log when it generates new snapshots. After a snapshot has been persistent it is safe for the log to truncate a prefix of the log up to the latest snapshot.
For one topic partition, Kafka will have the latest snapshot and zero or more older snapshots. These additional snapshots would have to be garbage collects. Having multiple snapshots is useful for minimizing re-fetching of the snapshot and will be described later in the document.
Design
Visually a Kafka Raft Log would look as follow:
Kafka Replicated Log: LBO -- high-watermark -- LEO -- V V V --------------------------------------------------- offset: | x + 1 | ... | y | y + 1 | ... | | ... | | epoch: | b | ... | c | d | ... | | ... | | --------------------------------------------------- Kafka Snapshot Files: <topic_name>-<partition_index>/x-a.checkpoint <topic_name>-<partition_index>/y-c.checkpoint
Legends
LEO - log end offset - the largest offset and epoch that has been written to disk.
high-watermark - the largest offset and epoch that has been replicated to N/2 + 1 replicas.
LBO - log begin offset - the smallest offset in the replicated log. This property used to be called log start offset. It is renamed such that the acronym doesn’t conflict with last stable offset (LSO).
Each Kafka topic partition will have multiple snapshots. Each snapshot contains the last offset and epoch for which this snapshot is consistent, the checksum of the snapshot and the sequence of records/operations representing the state machine.
In the example above, if the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the log begin offset (a, x)
then the leader handles the request as describe in KIP-595. Otherwise, the leader will respond with the offset and epoch of the latest snapshot (y, c)
and with the next fetch offset and epoch (y + 1, d)
.
The followers and observers can concurrently fetch both the snapshot and the replicated log. During leader election, followers with incomplete or missing snapshot will send a vote request and response as if they had an empty log.
Snapshot Format
The state machine is responsible for the format of the snapshot. The snapshot will be stored in the topic partition directory and will have a name of <SnapshotOffset>-<SnapshotOffsetEpoch>.checkpoint
. For example for the topic __cluster_metadata, partition 0, snapshot offset 5120793 and snapshot epoch 2, the full filename will be __cluster_metadata-0/00000000000005120793-2.checkpoint
.
TODO: Add more details on the format. Here are some requirements
Persist multiple formats. Kafka Controller may have a different snapshot format from Group Coordinator.
Persist information about the latest offset and epoch included. This could be in the file name such that it doesn’t interfere with the file format.
Persist information about the format type and version such that the state machine can decode it.
When to Snapshot
The state machine will compute the following metrics:
cardinality - the number of keys in the state machine.
update counter - the number of updates since the last snapshot.
The topic partition will also include snapshot.minimum.records
as a configuration.
If the update counter is greater than half the cardinality and the snapshot.minimum.records
configuration then trigger a snapshot of the current state.
When to Increase the Log Begin Offset
To minimize the need to send snapshots to slow replicas, the leader can delay increasing the log begin offset up to the minimum offset that it knows it has been replicate to all of the live replicas. Followers and observers will increase their log begin offset the value sent on the fetch response as long as the local state machine has generated a snapshot that includes to the log begin offset minus one.
LBO
can be increase/set to an offset X
if the following is true:
There is a snapshot for the topic partition which includes the offset for
LBO - 1
and.One of the following is true:
All of the replicas (followers and observers) have replicated
LBO
orLBO
ismax.replication.lag.ms
old.
Kafka allows the clients (DeleteRecords
RPCs) to delete records that are less than a given offset. Those request need to get validated using the same logic enumerated above.
When to Delete Snapshots
The broker will delete any snapshot with a latest offset and epoch (SnapshotOffsetAndEpoch
) less than the LBO - 1
(log begin offset). The means that the question of when to delete and snapshot is the same as when to increase the LBO
which is covered the the section “When To Increase the Log Begin Offset”.
Interaction Between Kafka Raft and State Machine
This section describe the interaction between the state machine and Kafka Raft.
From Kafka Raft to the State Machine
- Notify when a snapshot is available because of a
FetchSnapshot
.
From State Machine to Kafka Raft
Notify when snapshot finished. This includes the largest included offset and epoch.
Loading Snapshots
There are two cases when the state machine needs to load the snapshot:
When the state machine is first starting it will load the latest snapshot.
When the replica finishes fetches a new snapshot from the leader it will notify the state machine of the new snapshot. The state machine will clear its in memory state and load the snapshot.
After loading the snapshot, the state machine will apply operations in the replicated log with an offset greater than the snapshot’s offset and less than the high-watermark.
To avoid having a replica perform 1. followed by 2. the state machine should delay applying
Changes to leader election
The followers and observers of a topic partition will concurrently fetch the snapshot and replicated log. This means that candidates with incomplete snapshots will send a vote request with a LastOffsetEpoch
of -1
and a LastOffset
of -1
.
Quorum Reassignment
TODO: Need to investigate if there are any implications to quorum reassignment and as some of the operations in the snapshot are for reassignment.
Validation of Snapshot and Log
There is an invariant that every log must have at least one snapshot where the offset of the snapshot is between LogBeginOffset - 1
and High-Watermark
. If this is not the case then the replica should assume that the log is empty. Once the replica discovers the leader for the partition it can bootstrap itself by using the Fetch
and FetchSnapshot
RPCs as described in this document.
Public Interfaces
Topic configuration
Kafka topics will have an additional
cleanup.policy
value ofsnapshot
. Once the configuration option is set tosnapshot
it cannot be changed. The__cluster_metadata
topic will always have acleanup.policy
ofsnapshot
. Setting thecleanup.policy
tosnapshot
will enable the improvements document here.snapshot.minimum.records
- The minimum number of records committed after the latest snapshot needed before performing another snapshot.max.replication.lag.ms
- The maximum amount of time that leader will wait for an offset to get replicated to all of the replicas before advancing theLBO
. See section “When to Increase the Log Begin Offset”.
Network Protocol
Fetch
Request Schema
This improvement doesn’t require any changes to the fetch request outside of what is proposed in KIP-595.
Response Schema
The fetch response proposed in KIP-595 will be extended such that:
It includes new optional fields for snapshot offset and snapshot epoch.
The
LogStartOffset
field is renamed toLogBeginOffset
.
{ "apiKey": 1, "type": "response", "name": "FetchResponse", "validVersions": "0-12", "flexibleVersions": "12+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false, "about": "The top level response error code." }, { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false, "about": "The fetch session ID, or 0 if this is not part of a fetch session." }, { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+", "about": "The topic partitions.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partiiton index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no fetch error." }, { "name": "HighWatermark", "type": "int64", "versions": "0+", "about": "The current high water mark." }, { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true, "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, // Field renamed from LogStartOffset to LogBeginOffset { "name": "LogBeginOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, "about": "The current log begin offset." }, // ---------- Start new fields by KIP-595 ---------- { "name": "NextOffsetAndEpoch", "type": "OffsetAndEpoch", "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [ { "name": "NextFetchOffset", "type": "int64", "versions": "0+", "about": "If set, this is the offset that the follower should truncate to"}, { "name": "NextFetchOffsetEpoch", "type": "int32", "versions": "0+", "about": "The epoch of the next offset in case the follower needs to truncate"} ]}, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown."}, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch"} ]}, // ---------- End new fields by KIP-595 ---------- // ---------- Start new fields ---------- { "name": "SnapshotOffsetAndEpoch", "type": "OffsetAndEpoch", "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [ { "name": "FetchOffset", "type": "int64", "versions": "0+", "about": "If set, this is the offset that the follower should use for the FetchSnapshot request"}, { "name": "FetchOffsetEpoch", "type": "int32", "versions": "0+", "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"} ]}, // ---------- End new fields ---------- { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false, "about": "The aborted transactions.", "fields": [ { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId", "about": "The producer id associated with the aborted transaction." }, { "name": "FirstOffset", "type": "int64", "versions": "4+", "about": "The first offset in the aborted transaction." } ]}, { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true, "about": "The preferred read replica for the consumer to use on its next fetch request"}, { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "about": "The record data." } ]} ]} ] }
Handling Fetch Request
The replica’s handling of fetch request will be extended such that if FetchOffset
is less than LogBeginOffset
then the leader will respond with a SnapshotOffsetAndEpoch
of the latest snapshot and a NextOffsetAndEpoch
of the offset right after SnapshotOffsetAndEpoch
.
Handling Fetch Response
The replica's handling of fetch response will be extended such that if SnapshotOffsetAndEpoch
is set then the follower will truncate its entire log and start fetching from both the log and the snapshot. Where to fetch in the Log is describe by NextOffsetAndEpoch
while which snapshot to fetch is described by SnapshotOffsetAndEpoch
.
Fetching Snapshots
Request Schema
{ "apiKey": #, "type": "request", "name": "FetchSnapshotRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ReplicaId", "type": "int32", "versions": "0+", "about": "The broker ID of the follower, of -1 if this request is from a consumer." }, { "name": "Topics", "type": "[]TopicSnapshots", "versions": "0+", "about": "The topics to fetch.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The name of the topic to fetch." }, { "name": "Partitions", "type": "[]PartitionSnapshots", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "SnapshotOffsetAndEpoch", "type": "OffsetAndEpoch", "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [ { "name": "Offset", "type": "int64", "versions": "0+", "about": "If set, this is the offset that the follower should use for the FetchSnapshot request"}, { "name": "OffsetEpoch", "type": "int32", "versions": "0+", "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"} ]}, { "name": "Position", "type": "int64", "versions": "0+", "about": "The byte position within the snapshot." }, { "name": "MaxBytes", "type": "int32", "versions": "0+", "about": "The maximum bytes to fetch from this snapshot." } ]} ]} ] }
Response Schema
{ "apiKey": #, "type": "request", "name": "FetchSnapshotResponse", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false, "about": "The top level response error code." }, { "name": "Topics", "type": "[]TopicSnapshots", "versions": "0+", "about": "The topics to fetch.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The name of the topic to fetch." }, { "name": "Partitions", "type": "[]PartitionSnapshots", "versions": "0+", "about": "The partitions to fetch.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no fetch error." }, { "name": "MediaType", "type": "string", "version": "0+", "about": "The media type and version of the snapshot. This is only set when the position is 0."} { "name": "Continue", "type": "bool": "versions": "0+", "about": "True if there is data remaining for fetch." }, { "name": "Position", "type": "int64", "versions": "0+", "about": "The byte position within the snapshot." }, { "name": "Bytes", "type": "bytes": "versions": "0+", "about": "Snapshot data." }, ]} ]} ] }
Request Handling
When the leader receives a FetchSnapshot
request it would do the following for each partition:
Find the snapshot for
SnapshotOffsetAndEpoch
for the topicName
and partitionPartitionIndex
.Send the bytes in the snapshot from
Position
up toMaxBytes
.Set
Continue
to true if there are renaming bytes in the snapshots.
Errors:
SNAPSHOT_NOT_FOUND
- when the fetch snapshot request specifies aSnapshotOffsetAndEpoch
that doesn’t exists on the leader.OFFSET_OUT_OF_RANGE
- when the fetch snapshot request’s offset is greater than the size of the snapshot.NOT_LEADER_FOR_PARTITION
- when the fetch snapshot request is sent to a replica that is not the leader.
Response Handling
For each topic partition:
If
Continue
istrue
, the follower is expected to send anotherFetchSnapshotRequest
with anPosition
ofPosition + len(Bytes)
from the response.Copy the bytes in
Bytes
to the local snapshot file starting at file positionPosition
.If
Continue
is set to false then fetching the snapshot finised.Notify the Kafka Controller that a new snapshot is available. The Kafka Controller will load this new snapshot and read records from the log after the snapshot offset.
Errors:
SNAPSHOT_NOT_FOUND
,OFFSET_OUT_OF_RANGE
,NOT_LEADER_FOR_PARTITION
- The follower can rediscover the new snapshot by send a fetch request with an offset and epoch of zero.
Metrics
TODO: figure out some useful metrics. I probably wont have until we have a fairly complete implementation.
Compatibility, Deprecation, and Migration Plan
TODO
Rejected Alternatives
Append the snapshot to the Kafka log: Instead of having a separate file for snapshots and separate RPC for downloading the snapshot, the leader could append the snapshot to the log. This design has a few drawbacks. Because the snapshot gets appended to the head of the log that means that it will be replicated to every replica in the partition. This will cause Kafka to use more bandwidth than necessary.
Use Kafka Log infrastructure to store snapshots: Kafka could use the Log functionality to persists the snapshot to disk and to facilitate the replication of the snapshot to the replicas. This solution ties the snapshot format to Kafka Log’s record format. To lower the overhead of writing and loading the snapshot by the state machine, it is important to use a format that is compatible with the state machine.
Change leader when generating snapshots: Instead of supporting concurrent snapshots and writing to the client state, we require that only non-leaders generate snapshots. If the leader wants to generate a snapshot then it would relinquish leadership. We decided to not implement this for two reasons. 1. All replicas/brokers in the cluster will have a metadata cache which will apply the replicated log and allow read-only access by other components in the Kafka Broker. 2. Even though leader election within the replicated log may be efficient, communicating this new leader to all of the brokers and all of the external clients (admin clients, consumers, producers) may have a large latency.