Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejs
{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "Topics", "type": "[]TopicSnapshots", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshots", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
   	     { "name": "MediaTypeContinue", "type": "stringbool",: "versionversions": "0+",
    	      "about": "TheTrue mediaif typethere andis versiondata ofremaining theto snapshotfetch." This},
 is only set when the position is 0."}
	    { "name": "ContinuePosition", "type": "boolint64":, "versions": "0+",
	          "about": "TrueThe ifbyte thereposition iswithin data remaining for fetchthe snapshot." },
  	      { "name": "PositionBytes", "type": "int64bytes",: "versions": "0+",
          "about": "The byte position within the snapshot." },
	    { "name": "Bytes", "type": "bytes": "versions": "0+",
	      "about": "Snapshot data." },
      ]}
    ]}
  ]
}

...

When the leader receives a FetchSnapshot request it would do the following for each partition:

  1. Find the snapshot for SnapshotOffsetAndEpoch for the topic Name and partition PartitionIndex.

  2. Send the bytes in the snapshot from Position up to at most MaxBytes.

  3. Set Continue to true if there are renaming bytes in the snapshots.

...

  1. SNAPSHOT_NOT_FOUND - when the fetch snapshot request specifies a SnapshotOffsetAndEpoch that doesn’t exists on the leader.

  2. OFFSET_OUT_OF_RANGE - when the fetch snapshot request’s offset is greater than the size of the snapshot.

  3. NOT_LEADER_FOR_PARTITION - when the fetch snapshot request is sent to a replica that is not the leader.

Response Handling

For each topic partition:

  1. If Continue is true, the follower is expected to send another FetchSnapshotRequest with an Position of Position + len(Bytes) from the response.

  2. Copy the bytes in Copy the bytes in Bytes to the local snapshot file starting at file position Position.
  3. If Continue is true, the follower is expected to send another FetchSnapshotRequest with a Position set to the response's Position plus the response's len(Bytes).

  4. If Continue is false then fetching the snapshot finished. Notify the Kafka Controller that a new snapshot is available. The Kafka Controller will load this new snapshot and read records from the log after the snapshot offset.

...

  1. SNAPSHOT_NOT_FOUND, OFFSET_OUT_OF_RANGE, NOT_LEADER_FOR_PARTITION - The follower can rediscover the new snapshot by send sending a fetch request with an offset and epoch of zero.

Metrics

TODO: figure out some useful metrics. I probably wont have until we have a fairly complete implementation.

Compatibility, Deprecation, and Migration Plan

Full NameDescription
kafka.controller:type=KafkaController,name=GenSnapshotLatencyMsA histogram of the amount of time it took to generate a snapshot.
kafka.controller:type=KafkaController,name=LoadSnapshotLatencyMsA histogram of the amount of time it took to load the snapshot.
kafka.controller:type=KafkaController,name=SnapshotSizeBytesSize of the latest snapshot in bytes.

Compatibility, Deprecation, and Migration Plan

This KIP is only implemented for the internal topic __cluster_metadata. Internal and external clients for all other topics can ignore the SnapshotOffsetAndEpoch as that field will not be set for those topic partitions.TODO

Rejected Alternatives

Append the snapshot to the Kafka log: Instead of having a separate file for snapshots and separate RPC for downloading the snapshot, the leader could append the snapshot to the log. This design has a few drawbacks. Because the snapshot gets appended to the head of the log that means that it will be replicated to every replica in the partition. This will cause Kafka to use more network bandwidth than necessary.

...

Use Kafka Log infrastructure to store snapshots: Kafka could use the Log functionality to persists the snapshot to disk and to facilitate the replication of the snapshot to the replicas. This solution ties the snapshot format to Kafka Log’s record format. To lower the overhead of writing and loading the snapshot by the state machineKafka Controller, it is important to use a format that is compatible with the state machinethe in-memory representation of the Kafka Controller.


Change leader when generating snapshots: Instead of supporting concurrent snapshots snapshot generation and writing to the client in-memory state, we could require that only non-leaders generate snapshots. If the leader wants to generate a snapshot then it would relinquish leadership. We decided to not implement this for two reasons. 1. because:

  1. All replicas/brokers in the cluster will have a metadata cache which will apply the replicated log and allow read-only access by other components in the Kafka Broker.

...

  1. Even though leader election within the replicas in the replicated log may be efficient, communicating this new leader to all of the brokers and all of the external clients (admin clients

...

  1. ) may have a large latency.
  2. Kafka wants to support quorums where there is only one voter replica for the Kafka Controller.


Just fix the tombstone GC issue with log compaction: we have also talked about just fixing the tombstone garbage collecting issue as listed in motivation instead of introducing the snapshot mechanism. The key idea is that fetchers either from follower or from consumer needs to be notified if they "may" be missing some tombstones that have deprecated some old records. Since the offset span between the deprecated record and the tombstone for the same key can be arbitrarily long — for example, we have a record with key A at offset 0, and then a tombstone record for key A appended a week later, at offset 1 billion — we'd have to take a conservative approach here. More specifically:

  • The leader would maintain an additional offset marker up to which we have deleted tombstones, i.e. all tombstones larger than this offset are still in the log. Let's call it a tombstone horizon marker. This horizon marker would be returned in each fetch response.
  • Follower replicas would not do delete its tombstones beyond the received leader's horizon marker, and would maintain it's own marker as well which should always be smaller than leader's horizon marker offset.
  • All fetchers (follower and consumer) would keep track of the horizon marker from the replica it is fetching from. The expectation is that leaders should only delete tombstones in a very lazy manner, controlled by the existing config, so that the fetching position should be larger than the horizon marker most of the time. However, if the fetcher does found its fetching position below the horizon marker, then it needs to handle it accordingly:
    • For consumer fetchers, we need to throw an extended error of InvalidOffsetException to notify the users that we may missed some tombstones, and let the users to reset is consumed record states — for example, if the caller of consumer is a streams client, and it is building a local state from the fetched records, that state need to be wiped.
    • For replica fetchers, we need to wipe the local log all together and then restart fetching from the starting offset.
    • For either types of fetchers, we will set an internal flag, let's call it unsafe-bootstrapping mode to true, while remembering the horizon marker offset at the moment from the replica it is fetching from.
      • When this flag is set to true, we would not fall back to a loop of this handling logic even when the fetching position is smaller than the horizon marker, since otherwise we would fall into a loop of rewinding.
      • However, during this period of time, if the horizon marker offsets get updated from the fetch response, meaning that some tombstones are deleted again, then we'd need to retry this unsafe-bootstrap from step one.
      • After the fetching position as advanced beyond the remembered horizon marker offset, we would reset this flag to false to go back to the normal mode.

We feel this conservative approach is equally, if not more complicated than adding a snapshot mechanism since it requires rather tricky handling logic on the fetcher side; plus it only resolves one of the motivations — the tombstone GC garbage collection issue.

Another thought agains againts this idea is that for now most of the internal topics that would need the snapshotting mechanism (controller, txn transaction coordinator, group coordinator) that would need the snapshot mechanism describe in this KIP can hold their materialized cache in memory completely, and hence generating snapshots should be comparably more efficient to execute.

...