Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

...

  1. There is a snapshot for the topic partition which includes the offset for LBO - 1 and.

  2. One of the following is true:

    1. All of the replicas (followers and observers) have replicated LBO or

    2. LBO is max.replication.lag.ms old.

Kafka allows the clients (DeleteRecords RPCs) to delete records that are less than a given offset. Those request need to get validated using the same logic enumerated above.

...

  1. Kafka topics will have an additional cleanup.policy value of snapshot. Once the configuration option is set to snapshot it cannot be changed. The __cluster_metadata topic will always have a cleanup.policy of snapshot. Setting the cleanup.policy to snapshot will enable the improvements document here.

  2. snapshot.minimum.records - The minimum number of records committed after the latest snapshot needed before performing another snapshot.

  3. max.replication.lag.ms - The maximum amount of time that leader will wait for an offset to get replicated to all of the replicas before advancing the LBO. See section “When to Increase the Log Begin Offset”.

...

{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "0+",
      "about": "The broker ID of the follower, of -1 if this request is from a consumer." },
    { "name": "Topics", "type": "[]TopicSnapshots", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshots", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "SnapshotOffsetAndEpoch", "type": "OffsetAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [
          { "name": "Offset", "type": "int64", "versions": "0+",
            "about": "If set, this is the offset that the follower should use for the FetchSnapshot request"},
          { "name": "OffsetEpoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot." },
        { "name": "MaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this snapshot." }
      ]}
    ]}
  ]
}
  

Response Schema

{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "Topics", "type": "[]TopicSnapshots", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshots", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "MediaType", "type": "string", "version": "0+",
          "about": "The media type and version of the snapshot. This is only set when the position is 0."}
	{ "name": "Continue", "type": "bool": "versions": "0+",
	  "about": "True if there is data remaining for fetch." },
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot." },
	{ "name": "Bytes", "type": "bytes": "versions": "0+",
	  "about": "Snapshot data." },
      ]}
    ]}
  ]
}

...