...
Code Block |
---|
Kafka Replicated Log: logStartOffset LBO -- high-watermark -- LEO -- V V V ----------------------------------------------- offset: | x | ... | y - 1 | y | ... | | ... | | epoch: | b | ... | c | d | ... | | ... | | ----------------------------------------------- Kafka Snapshot Files: <topic_name>-<partition_index>/checkpoints/x-a.checkpoint <topic_name>-<partition_index>/checkpoints/y-c.checkpoint |
...
LEO - log end offset - the next offset to be written to disk.
high-watermark - the largest offset and epoch that has been replicated to N/2 + 1 replicas.
LBO LogStartOffset - log begin start offset - the smallest offset in the replicated log. This property used to be called log start offset. It is renamed such that the acronym doesn’t conflict with last stable offset (LSO).
In the example above, offset=x - 1, epoch=a does not appear in the replicated log because it is before the LBO logStartOffset
(x, b). If the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the LBO logStartOffset
(x, b)
then the leader handles the request as describe in KIP-595. Otherwise, the leader will respond with the epoch and end offset of the latest snapshot (y, c)
.
...
To minimize the need to send snapshots to slow replicas, the leader can delay increasing the log begin offset up to the minimum offset that it knows it has been replicate to all of the live replicas. The leader's LBO
LogStartOffset
can be increased to an offset X
if the following is true:
There is a snapshot for the topic partition which includes the offset for X
- 1
and.One of the following is true:
All of the live replicas (followers and observers) have replicated
LBO
orLBO
LogStartOffset
ismetadata.lbostart.offset.lag.time.max.ms
milliseconds old.
...
The broker will delete any snapshot with a end offset and epoch (SnapshotId
) less than the LBO
LogStartOffset
. That means that the question of when to delete a snapshot is the same as when to increase the LBO
LogStartOffset
which is covered in the section “When To Increase the Log Begin Offset”.
...
- Follower sends a fetch request
- Leader replies with a snapshot epoch and end offset.
- Follower pauses fetch requests
- Follower fetches a snapshot from the leader
- Follower resumes fetch requests by
- Setting the
LBO
LogStartOffset
to the snapshot's end offset. - Setting the
LEO
orFetchOffset
in the fetch request to the snapshot's end offset. - Setting the
LastFetchedEpoch
in the fetch request to the snapshot's epoch.
- Setting the
...
For each replica we have the following invariant:
- If the
LBO
LogStartOffset
is 0, there are zero or more snapshots where the end offset of the snapshot is betweenLBO
LogStartOffset
and theHigh-Watermark.
- If the
LBO
LogStartOffset
is greater than 0, there are one or more snapshots where the end offset of the snapshot is betweenLBO
LogStartOffset
and theHigh-Watermark.
If the latest snapshot has an epoch E1
and end offset O1
and is it newer than the LEO
of the replicated log, then the replica must set the LBO
LogStartOffset
and LEO
to O
1.
Public Interfaces
...
The
__cluster_metadata
topic will have acleanup.policy
value ofsnapshot
. This configuration can only be read. Updates to this configuration will not be allowed.metadata.snapshot.min.cleanable.ratio
- The minimum ratio of snapshot records that have to change before generating a snapshot. See section "When to Snapshot". The default is .5 (50%).metadata.lbostart.offset.lag.time.max.ms
- The maximum amount of time that leader will wait for an offset to get replicated to all of the live replicas before advancing theLBO
. See section “When to Increase the Log Begin Offset”LogStartOffset”. The default is 7 days.
Network Protocol
...
Code Block | ||
---|---|---|
| ||
{ "apiKey": 1, "type": "response", "name": "FetchResponse", "validVersions": "0-12", "flexibleVersions": "12+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false, "about": "The top level response error code." }, { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false, "about": "The fetch session ID, or 0 if this is not part of a fetch session." }, { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+", "about": "The topic partitions.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no fetch error." }, { "name": "HighWatermark", "type": "int64", "versions": "0+", "about": "The current high water mark." }, { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true, "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, // ---------- Start of renamed field ---------- { "name": "LogBeginOffsetLogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, "about": "The current log start offset." }, // ---------- End of renamed field ---------- { "name": "DivergingEpoch", "type": "EpochEndOffset", "versions": "12+", "taggedVersions": "12+", "tag": 0, "about": "In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge", "fields": [ { "name": "Epoch", "type": "int32", "versions": "12+", "default": "-1" }, { "name": "EndOffset", "type": "int64", "versions": "12+", "default": "-1" } ]}, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown."}, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch"} ]}, // ---------- Start of new field ---------- { "name": "SnapshotId", "type": "SnapshotId", "versions": "12+", "taggedVersions": "12+", "tag": 2, "fields": [ { "name": "EndOffset", "type": "int64", "versions": "0+", "about": "If set, this is the end offset that the follower should use for the FetchSnapshot request"}, { "name": "Epoch", "type": "int32", "versions": "0+", "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"} ]}, // ---------- End new field ---------- { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false, "about": "The aborted transactions.", "fields": [ { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId", "about": "The producer id associated with the aborted transaction." }, { "name": "FirstOffset", "type": "int64", "versions": "4+", "about": "The first offset in the aborted transaction." } ]}, { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true, "about": "The preferred read replica for the consumer to use on its next fetch request"}, { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "about": "The record data." } ]} ]} ] } |
...