...
The type of in-memory state machines that we plan to implement for the Kafka Controller and metadata cache (see KIP-631) doesn't map very well to a key and offset based clean up policy. The data model for the Kafka Controller and metadata cache requires us to supports deltas/events. Currently the controller uses ZK for both storing the current state of the cluster and the deltas that it needs to apply. One example of this is topic deletion, when the user asks for a topic to be deleted 1) the Kafka Controller (AdminManager
in the code) persists and commits the delete event, 2) applies the event to its in-memory state and sends RPC to all of the affected brokers, 3) persists and commits the changes to ZK. We don't believe that a key-value and offset based compaction model can be used for events/deltas in a way that it is intuitive to program against.
Another operation that the Kafka Controller models as an event is the FenceBroker
message from KIP-631. Every time a broker is fenced because of controlled shutdown or failure to send a heartbeat it affects every topic partition assigned to the fenced broker. For details on the impact on replication impact of event operations (FenceBroker
) vs key-value operation operations (IsrChange
) see the section "Amount of Data Replicated".
...
Because the Kafka Controller and Metadata Cache as described in KIP-631 will use events/deltas to represent some of its in-memory state changes the amount of data that needs to be written to the log and replicated to all of the brokers can be reduced. As an example if a broker goes offline and we assume that we have 100K topics each with 10 partitions with a replica replication factor of 3 , and 100 brokers. In a well balanced cluster we can expect that no broker will have 2 or more partitions for the same topic. In this case the best we can do for IsrChange
message and FenceBroker
message are the following message sizes:
...
Generating and loading the snapshot will be delegated to the Kafka Controllers and Metadata Cache. The Kafka Controllers and Metadata Cache will notify the Kafka Raft layer when it has generated a snapshot and the end offset of of the snapshot. The Kafka Raft layer will notify the Kafka Controller or Metadata Cache when a new snapshot has been fetched from the Active Controller. See section "Interaction with the Kafka Controller or Metadata Cache" and KIP 595 for more details.
...
In the example above, offset=x - 1, epoch=a does not appear in the replicated log because it is before the LogStartOffset
(x, b). If the Kafka topic partition leader receives a fetch request with an offset and last fetched epoch greater than or equal to the LogStartOffset
(x, b)
then the leader handles the request as describe in KIP-595. Otherwise, the leader will respond with the SnapshotId,
epoch and end offset, of the latest snapshot (y, c)
.
Adding snapshots to KIP-595 has an effect on changes how voters grant votes and how candidate request votes. See section "Changes to Leader Election" for details on those changes.
Snapshot Format
The Kafka Controller and Metadata Cache are responsible for the content of the snapshot. Each snapshot is uniquely identified by a SnapshotId
, the epoch and end offset of the records in the replicated log included in the snapshot. The snapshot will be stored in the checkpoints
directory in the topic partition directory and will have a name of <SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint
. For example, for the topic __cluster_metadata, partition 0, snapshot end offset 5120793 and snapshot epoch 2, the full filename will be __cluster_metadata-0/checkpoints/00000000000005120793-00000000000000000002.checkpoint
.
The snapshot epoch will be used when ordering snapshots and more importantly when setting the LastFetchedEpoch
field in the Fetch request. It is possible for a follower to have a snapshot and an empty log. In this case the follower will use the epoch of the snapshot when setting the LastFetchEpoch
in the Fetch request.
...
If the Kafka Controller or Metadata Cache generates a snapshot too frequently then it can negatively affect the performance of the disk. If it doesn't generate a snapshot often enough then it can increase the amount of time it takes to load its state into memory and it can increase the amount space taken up by the replicated log. The Kafka Controller and Metadata Cache will have a new configuration option options metadata.snapshot.min.cleanable.ratio
. If the and metadata.snapshot.min.records.size
. Both of these configuration need to be satisfied before the Controller or Metadata Cache will generate a new snapshot.
metadata.snapshot.min.cleanable.ratio
is the minimum number of snapshot records that have changed (deleted or modified) between the latest snapshot and the current in-memory state
...
- .
...
- Note that new snapshot records don't count against this ratio. If a new snapshot record was added since that last snapshot then it doesn't affect the dirty ratio. If
...
- a snapshot record was added and then modify or deleted then it counts against the cleanable ratio.
-
metadata.snapshot.min.records.size
is the minimum size of the records in the replicated log between the latest snapshot and then high-watermark.
When to Increase the LogStartOffset
To minimize the need to send snapshots to slow replicas, the leader can delay increasing the log start offset up to the minimum offset that it knows it has been replicate to all of the live replicas. The leader's LogStartOffset
can be increased to an offset X
if the following is true:
There is a snapshot for the topic partition which includes the offset for X
- 1
andwith an end offset ofX
.One of the following is true:
All of the live replicas (followers and observers) have replicated
LogStartOffset
orLogStartOffset
ismetadata.start.offset.lag.time.max.ms
milliseconds old.
Followers and observers will increase their log start offset to the value sent on the fetch response as long as the local Kafka Controller and Metadata Cache has generated a snapshot with an end offset greater than or equal to the new log start offset. In other words for followers and observers the local log start offset is the minimum of the leader's log start offset and the largest local snapshot.
Kafka allows the clients to delete records that are less than a given offset by using the DeleteRecords
RPC . Those requests will be rejected for the __cluster_metadata
topic with a POLICY_VIOLATION error.
...
- Notify when a snapshot is available because of a
FetchSnapshot
.
From the Kafka Controller and Metadata Cache to Kafka Raft
...
There are two cases when the Kafka Controller or Metadata Cache needs to load the a snapshot:
When it is booting.
When the follower and observer replicas finishes fetching a new snapshot from the leader.
...
- Follower sends a fetch request
- Leader replies with a snapshot epoch and end offset.
- Follower pauses fetch requests
- Follower fetches snapshot chunks from the leader into a temporary snapshot file (EndOffset
<EndOffset>-
Epoch<Epoch>.checkpoint.part
) - Validate fetch fetched snapshot
- All snapshot chunks are fetched
- Verify the CRC of the records in the snapshot
- Atomically move the temporary snapshot file (EndOffset
<EndOffset>-
Epoch<Epoch>.checkpoint.part
) to the permanent location (EndOffset<EndOffset>-
Epoch<Epoch>.checkpoint
)
- Follower resumes fetch requests by
- Setting the
LogStartOffset
to the snapshot's end offset. - Setting the
LEO
orFetchOffset
in the fetch request to the snapshot's end offset. - Setting the
LastFetchedEpoch
in the fetch request to the snapshot's epoch.
- Setting the
Assuming that the snapshot fetched in bullet 4 has an epoch E1
and a offset O1
. The state of the follower between bullet 4. and 5. could be is as follow:
- Contains a snapshot with epoch
E1
and end offsetO1
. - Contains many zero or more snapshots older/less than epoch
E1
and end offsetO
1. - Contains a replicated log with
LEO
older/less than epochE
1 and end offsetO
1.
...
The
__cluster_metadata
topic will have acleanup.policy
value ofsnapshot
. This configuration can only be read. Updates to this configuration will not be allowed.metadata.snapshot.min.cleanable.ratio
- The minimum ratio of snapshot records that have to change before generating a snapshot. See section "When to Snapshot". The default is .5 (50%).metadata.snapshot.min.records.size
- This is the minimum number of bytes in the replicated log between the latest snapshot and the high-watermark needed before generating a new snapshot. See section "When to Snapshot". The default is 20MB.metadata.start.offset.lag.time.max.ms
- The maximum amount of time that leader will wait for an offset to get replicated to all of the live replicas before advancing theLogStartOffset
. See section “When to Increase the LogStartOffset”. The default is 7 days.
...
The fetch response proposed in KIP-595 will be extended such that :
...
it includes new optional fields for snapshot end offset and snapshot epoch.
Code Block | ||
---|---|---|
| ||
{ "apiKey": 1, "type": "response", "name": "FetchResponse", "validVersions": "0-12", "flexibleVersions": "12+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false, "about": "The top level response error code." }, { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false, "about": "The fetch session ID, or 0 if this is not part of a fetch session." }, { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+", "about": "The topic partitions.", "fields": [ { "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index." }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The error code, or 0 if there was no fetch error." }, { "name": "HighWatermark", "type": "int64", "versions": "0+", "about": "The current high water mark." }, { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true, "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, "about": "The current log start offset." }, { "name": "DivergingEpoch", "type": "EpochEndOffset", "versions": "12+", "taggedVersions": "12+", "tag": 0, "about": "In case divergence is detected based on the `LastFetchedEpoch` and `FetchOffset` in the request, this field indicates the largest epoch and its end offset such that subsequent records are known to diverge", "fields": [ { "name": "Epoch", "type": "int32", "versions": "12+", "default": "-1" }, { "name": "EndOffset", "type": "int64", "versions": "12+", "default": "-1" } ]}, { "name": "CurrentLeader", "type": "LeaderIdAndEpoch", "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [ { "name": "LeaderId", "type": "int32", "versions": "0+", "about": "The ID of the current leader or -1 if the leader is unknown."}, { "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The latest known leader epoch"} ]}, // ---------- Start of new field ---------- { "name": "SnapshotId", "type": "SnapshotId", "versions": "12+", "taggedVersions": "12+", "tag": 2, "fields": [ { "name": "EndOffset", "type": "int64", "versions": "0+", "about": "If set, this is the end offset that the follower should use for the FetchSnapshot request"}, { "name": "Epoch", "type": "int32", "versions": "0+", "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"} ]}, // ---------- End new field ---------- { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false, "about": "The aborted transactions.", "fields": [ { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId", "about": "The producer id associated with the aborted transaction." }, { "name": "FirstOffset", "type": "int64", "versions": "4+", "about": "The first offset in the aborted transaction." } ]}, { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true, "about": "The preferred read replica for the consumer to use on its next fetch request"}, { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "about": "The record data." } ]} ]} ] } |
...
Find the snapshot for
SnapshotId
for the topicName
and partitionPartitionIndex
Index
.- Set the
Size
of each snapshot. Send the bytes in the snapshot from
Position
. If there are multiple partitions in theFetchSnapshot
request, then the leader will evenly distribute the number of bytes sent across all of the partitions. The leader will not send more bytes in the response thanResponseMaxBytes
, the minimum ofMaxBytes
in the request and the value configured inreplica.fetch.response.max.bytes
.- Each topic partition is guaranteed to receive at least the average of
ResponseMaxBytes
if that snapshot has enough bytes remaining. - If there are topic partitions with snapshots that have remaining bytes less than the average
ResponseMaxBytes
, then those bytes may be used to send snapshot bytes for other topic partitions.
- Each topic partition is guaranteed to receive at least the average of
...
- Copy the bytes in
Bytes
to the local snapshot file name<SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint.part
starting at file positionPosition
. If
Size
is greater than the size of the current snapshot file, then the follower is expected to send anotherFetchSnapshotRequest
with aPosition
set to the response'sPosition
plus the response'slen(Bytes)
.If
Size
is equal to the size of the current snapshot file, then fetching the snapshot has finished.- Validate the snapshot file.
Atomically move the file named
<SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint.part
to a file named<SnapshotId.EndOffset>-<SnapshotId.Epoch>.checkpoint
.Notify the Kafka Controller or Metadata Cache that a new snapshot is available.
The Kafka Controller and Metadata Cache will load this new snapshot and read records from the log after the snapshot's end offset.
...
SNAPSHOT_NOT_FOUND
,OFFSET_OUT_OF_RANGE
,NOT_LEADER_FOR_PARTITION
- The follower can rediscover the new snapshot by sending another fetchFetch
request.
Metrics
Full Name | Description |
---|---|
kafka.controller:type=KafkaController,name=GenSnapshotLatencyMs | A histogram of the amount of time it took to generate a snapshot. |
kafka.controller:type=KafkaController,name=LoadSnapshotLatencyMs | A histogram of the amount of time it took to load the snapshot. |
kafka.controller:type=KafkaController,name=SnapshotSizeBytes | Size of the latest snapshot in bytes. |
kafka.controller:type=KafkaController,name=SnapshotLag | The number of offsets between the largest snapshot offset and the high-watermark. |
...