You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In KIP-500 the Kafka Controller, which is the quorum leader from KIP-595, will materialize the entries in the metadata log into memory. Each broker in KIP-500, which will be a replica of the metadata log, will materialize the entries in the log into a metadata cache. We want to provide stronger guarantee on the consistency of the metadata than is possible today. If all entries in the metadata log are indefinitely retained, then the design described in KIP-595 achieves a consistent log and as a result a consistent metadata cache. If Kafka doesn’t limit the size of this replicated log it can affect Kafka availability by:

  1. Running out of disk space
  2. Increasing the time needed to load the replicated log into memory
  3. Increasing the time it takes for new brokers to catch up to the leader's log and hence join the cluster.

Kafka uses the cleanup policies compact and delete to limit the size of the log. The delete cleanup policy deletes data for a prefix of the log based on time. The delete cleanup policy is not viable when used on metadata log because the Kafka controller and metadata cache will not be able to generate a consistent view from such a log.

The compact cleanup policy incrementally removes key/value records that are not needed to reconstruct the associated key/value state. During compaction the log cleaner keeps track of an OffsetMap which is a mapping from key to the latest offset in the section of the log being cleaned. The log cleaner assumes that any record with an offset less that the value in the OffsetMap given by the record's key is safe to remove. This solution will eventually result in one record for every key in log. This includes a tombstone record for key that have been deleted. Kafka removes this tombstone records after a configurable time. This can result in different logs in each replica which can result in different in-memory state after applying the log. We explore changes to the compact cleanup policy in the "Rejected Alternatives" section.

Type of State Machines and Operations

The type of in-memory state machines what we plan to implement for the Kafka Controller (see KIP-631) doesn't map very well to an key and offset based clean up policy. The data model for the Kafka Controller requires us to supports deltas/events. Currently the controller uses ZK for both storing the current state of the cluster and the deltas that it needs to apply. One example of this is topic deletion, when the user asks for a topic to be deleted 1) the Kafka Controller (AdminManager in the code) persists and commits the delete event, 2) applies the event to its in-memory state and sends RPC to all of the affected brokers, 3) persists and commits the changes to ZK. We don't believe that a key-value and offset based compaction model can be for events/deltas in a way that it is intuitive to program against.

Loading State and Frequency of Compaction

When starting a broker either because it is a new broker, a broker was upgraded or a failed broker is restarting. Loading the state represented by the __cluster_metadata topic partition is required before the broker is available. With snapshot based of the in-memory state Kafka can be much more aggressive on how often it generates the snapshot and the snapshot can include up to the high watermark. In the future this mechanism will also be useful for high-throughput topic partitions like the Group Coordinator and Transaction Coordinator.

Amount of Data Replicated

Because the Kafka Controller as described in KIP-631 will use events/deltas to represent some of it in-memory state changes the amount of data that needs to be persistent and replicated to all of the brokers can be reduced. As an example if a broker goes offline and we assume that we have 1 million topic partitions, 10 brokers, an IsrChange message of 40 bytes (4 bytes for partition id, 16 bytes for topic id, 8 bytes for ISR ids, 4 bytes for leader id, 4 bytes for leader epoch, 2 bytes for api key and 2 bytes for api version) and a DeleteBroker message of 8 bytes (4 bytes for broker id, 2 bytes for api key and 2 bytes for api version).

If events/deltas are not allowed in the replicated log for the __cluster_metadata topic partition then the Kafka Controller and each broker needs to persist 40 bytes per topic partitions hosted by the broker. In the example above that comes out to 3.81 MB. The Kafka Controller will need to replicate 3.81 MB to each broker in the cluster (10) or 38.14 MB.

Overview

This KIP assumes that KIP-595 has been approved and implemented. With this improvement replicas will continue to replicate their log among each others. Replicas will snapshot their log independent of each other. Snapshots will be consistent because the logs are consistent across replicas. Follower and observer replicas fetch the snapshots from the leader they attempt to fetch an offset from the leader and the leader doesn’t have that offset in the log because it is included in the snapshot.

Generating and loading the snapshot will be delegated to the Kafka Controller. The Kafka Controller will notify the Kafka Raft client when it has generated a snapshot and up to which offset is included in the snapshot. The Kafka Raft client will notify the Kafka Controller when a new snapshot has been fetched from the leader.

This KIP focuses on the changes required for use by the Kafka Controller's __cluster_metadata topic partition but we should be able to extend it to use it in other internal topic partitions like __consumer_offset and __transaction.

Proposed Changes

The __cluster_metadata topic will have snapshot as the cleanup.policy. The Kafka Controller will have an in-memory representation of the replicated log up to at most the high-watermark. When performing a snapshot the Kafka Controller will serialize this in-memory state to disk. This snapshot file on disk is described by the largest offset and epoch from the replicated log that has been included.

The Kafka Controller will notify the Kafka Raft client when it has finished generating a new snapshots. It is safe for the log to truncate a prefix of the log up to the latest snapshot. The __cluster_metadata topic partition will have the latest snapshot and zero or more older snapshots. Having multiple snapshots is useful for minimizing re-fetching of the snapshot and will be described later in the document. These additional snapshots would have to be deleted, a process describe later in the document. 

Design

Visually a Kafka Raft Log would look as follow:

Kafka Replicated Log:

      LBO  --               high-watermark --      LEO --
            V                               V           V
        ---------------------------------------------------
offset: | x + 1 | ... | y | y + 1 |  ...  |   |  ...  |   |
epoch:  | b     | ... | c | d     |  ...  |   |  ...  |   |
        ---------------------------------------------------

Kafka Snapshot Files:

<topic_name>-<partition_index>/x-a.checkpoint
<topic_name>-<partition_index>/y-c.checkpoint

Note: The extension checkpoint will be used since Kafka already has file with the snapshot extension.

Legends

  1. LEO - log end offset - the largest offset and epoch that has been written to disk.

  2. high-watermark - the largest offset and epoch that has been replicated to N/2 + 1 replicas.

  3. LBO - log begin offset - the smallest offset in the replicated log. This property used to be called log start offset. It is renamed such that the acronym doesn’t conflict with last stable offset (LSO).

In the example above, if the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the log begin offset (x, a) then the leader handles the request as describe in KIP-595. Otherwise, the leader will respond with the offset and epoch of the latest snapshot (y, c) and with the next fetch offset and epoch (y + 1, d).

The followers and observers can concurrently fetch both the snapshot and the replicated log. During leader election, followers with incomplete or missing snapshot will send a vote request and response as if they had an empty log.

Snapshot Format

The Kafka Controller is responsible for the format of the snapshot. The snapshot will be stored in the topic partition directory and will have a name of <SnapshotOffset>-<SnapshotOffsetEpoch>.checkpoint. For example for the topic __cluster_metadata, partition 0, snapshot offset 5120793 and snapshot epoch 2, the full filename will be __cluster_metadata-0/00000000000005120793-2.checkpoint.

When to Snapshot

If the Kafka Controller generates a snapshot too frequently then can negatively affect the performance of the disk. If it doesn't generate a snapshot often enough then it can increase the amount of time it takes to load its state into memory and it can increase the amount space taken up by the replicated log. The Kafka Controller will have a new configuration option controller.snapshot.minimum.records.  If the number of records between the latest snapshot and the high-watermark is greater than controller.snapshot.minimum.records, then the Kafka Controller will perform a new snapshot.

When to Increase the Log Begin Offset

To minimize the need to send snapshots to slow replicas, the leader can delay increasing the log begin offset up to the minimum offset that it knows it has been replicate to all of the live replicas. Followers and observers will increase their log begin offset to the value sent on the fetch response as long as the local state machine has generated a snapshot that includes to the log begin offset minus one.

LBO can be increase/set to an offset X if the following is true:

  1. There is a snapshot for the topic partition which includes the offset for X - 1 and.

  2. One of the following is true:

    1. All of the live replicas (followers and observers) have replicated LBO or

    2. LBO is max.replication.lag.ms old.

Kafka allows the clients to delete records that are less than a given offset by using the DeleteRecords RPC . Those requests will be validated using the same logic enumerated above.

When to Delete Snapshots

The broker will delete any snapshot with a latest offset and epoch (SnapshotOffsetAndEpoch) less than the LBO - 1 (log begin offset). The means that the question of when to delete a snapshot is the same as when to increase the LBO which is covered the the section “When To Increase the Log Begin Offset”.

Interaction Between Kafka Raft and the Kafka Controller

This section describe the interaction between the state machine and Kafka Raft.

From Kafka Raft to the Kafka Controller

  1. Notify when a snapshot is available because of a FetchSnapshot.

From the Kafka Controller to Kafka Raft

  1. Notify when snapshot finished. This includes the largest included offset and epoch.

Loading Snapshots

There are two cases when the Kafka Controller needs to load the snapshot:

  1. When the broker is booting.

  2. When the follower and observer replicas finishes fetches a new snapshot from the leader.

After loading the snapshot, the Kafka Controller will apply operations in the replicated log with an offset greater than the snapshot’s offset and less than the high-watermark.

Changes to Leader Election

The followers of the __cluster_metadata topic partition will concurrently fetch the snapshot and replicated log. This means that candidates with incomplete snapshots will send a vote request with a LastOffsetEpoch of -1 and a LastOffset of -1 no matter the LEO of the replicated log.

Validation of Snapshot and Log

There is an invariant that every log must have at least one snapshot where the offset of the snapshot is between LogBeginOffset - 1 and High-Watermark. If this is not the case then the replica should assume that the log is empty. Once the replica discovers the leader for the partition it can bootstrap itself by using the Fetch and FetchSnapshot RPCs as described in this document.

Public Interfaces

Topic configuration

  1. The __cluster_metadata topic will have an cleanup.policy value of snapshot. This configuration can only be read. Updates to this configuration will not be allowed.

  2. controller.snapshot.minimum.records is the minimum number of records committed after the latest snapshot before the Kafka Controller will perform another snapshot. See section "When to Snapshot"

  3. max.replication.lag.ms - The maximum amount of time that leader will wait for an offset to get replicated to all of the live replicas before advancing the LBO. See section “When to Increase the Log Begin Offset”.

Network Protocol

Fetch

Request Schema

This KIP doesn’t require any changes to the fetch request outside of what is proposed in KIP-595.

Response Schema

The fetch response proposed in KIP-595 will be extended such that:

  1. It includes new optional fields for snapshot offset and snapshot epoch.

  2. The LogStartOffset field is renamed to LogBeginOffset.

{
  "apiKey": 1,
  "type": "response",
  "name": "FetchResponse",
  "validVersions": "0-12",
  "flexibleVersions": "12+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
      "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
      "about": "The response topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
        "about": "The topic partitions.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partiiton index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "HighWatermark", "type": "int64", "versions": "0+",
          "about": "The current high water mark." },
        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
        // Field renamed from LogStartOffset to LogBeginOffset 
        { "name": "LogBeginOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
          "about": "The current log begin offset." },
        // ---------- Start new fields by KIP-595 ----------
        { "name": "NextOffsetAndEpoch", "type": "OffsetAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [
          { "name": "NextFetchOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the offset that the follower should truncate to"},
          { "name": "NextFetchOffsetEpoch", "type": "int32", "versions": "0+",
            "about": "The epoch of the next offset in case the follower needs to truncate"}
        ]},
        { "name": "CurrentLeader", "type": "LeaderIdAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 1, fields": [
          { "name": "LeaderId", "type": "int32", "versions": "0+",
            "about": "The ID of the current leader or -1 if the leader is unknown."},
          { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
            "about": "The latest known leader epoch"}
        ]},
        // ---------- End new fields by KIP-595 ----------
        // ---------- Start new fields ----------
        { "name": "SnapshotOffsetAndEpoch", "type": "OffsetAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [
          { "name": "FetchOffset", "type": "int64", "versions": "0+",
            "about": "If set, this is the offset that the follower should use for the FetchSnapshot request"},
          { "name": "FetchOffsetEpoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        // ---------- End new fields ----------
        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
          "about": "The aborted transactions.",  "fields": [
          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
            "about": "The producer id associated with the aborted transaction." },
          { "name": "FirstOffset", "type": "int64", "versions": "4+",
            "about": "The first offset in the aborted transaction." }
        ]},
        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true,
          "about": "The preferred read replica for the consumer to use on its next fetch request"},
        { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
          "about": "The record data." }
      ]}
    ]}
  ]
}

Handling Fetch Request

The replica’s handling of fetch request will be extended such that if FetchOffset is less than LogBeginOffset then the leader will respond with a SnapshotOffsetAndEpoch of the latest snapshot and a NextOffsetAndEpoch of the offset right after SnapshotOffsetAndEpoch.

Handling Fetch Response

The replica's handling of fetch response will be extended such that if SnapshotOffsetAndEpoch is set then the follower will truncate its entire log and start fetching from both the log and the snapshot. Where to fetch in the Log is describe by NextOffsetAndEpoch while which snapshot to fetch is described by SnapshotOffsetAndEpoch.

Fetching Snapshots

Request Schema

{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ReplicaId", "type": "int32", "versions": "0+",
      "about": "The broker ID of the follower, of -1 if this request is from a consumer." },
    { "name": "Topics", "type": "[]TopicSnapshots", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshots", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "SnapshotOffsetAndEpoch", "type": "OffsetAndEpoch",
          "versions": "12+", "taggedVersions": "12+", "tag": 0, "fields": [
          { "name": "Offset", "type": "int64", "versions": "0+",
            "about": "If set, this is the offset that the follower should use for the FetchSnapshot request"},
          { "name": "OffsetEpoch", "type": "int32", "versions": "0+",
            "about": "If set, this is the epoch that the follower should use for the FetchSnapshot request"}
        ]},
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot." },
        { "name": "MaxBytes", "type": "int32", "versions": "0+",
          "about": "The maximum bytes to fetch from this snapshot." }
      ]}
    ]}
  ]
}

Response Schema

{
  "apiKey": #,
  "type": "request",
  "name": "FetchSnapshotResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+", "ignorable": false,
      "about": "The top level response error code." },
    { "name": "Topics", "type": "[]TopicSnapshots", "versions": "0+",
      "about": "The topics to fetch.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
        "about": "The name of the topic to fetch." },
      { "name": "Partitions", "type": "[]PartitionSnapshots", "versions": "0+",
        "about": "The partitions to fetch.", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
          "about": "The partition index." },
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The error code, or 0 if there was no fetch error." },
        { "name": "MediaType", "type": "string", "version": "0+",
          "about": "The media type and version of the snapshot. This is only set when the position is 0."}
	    { "name": "Continue", "type": "bool": "versions": "0+",
	      "about": "True if there is data remaining for fetch." },
        { "name": "Position", "type": "int64", "versions": "0+",
          "about": "The byte position within the snapshot." },
	    { "name": "Bytes", "type": "bytes": "versions": "0+",
	      "about": "Snapshot data." },
      ]}
    ]}
  ]
}

Request Handling

When the leader receives a FetchSnapshot request it would do the following for each partition:

  1. Find the snapshot for SnapshotOffsetAndEpoch for the topic Name and partition PartitionIndex.

  2. Send the bytes in the snapshot from Position up to MaxBytes.

  3. Set Continue to true if there are renaming bytes in the snapshots.

Errors:

  1. SNAPSHOT_NOT_FOUND - when the fetch snapshot request specifies a SnapshotOffsetAndEpoch that doesn’t exists on the leader.

  2. OFFSET_OUT_OF_RANGE - when the fetch snapshot request’s offset is greater than the size of the snapshot.

  3. NOT_LEADER_FOR_PARTITION - when the fetch snapshot request is sent to a replica that is not the leader.

Response Handling

For each topic partition:

  1. If Continue is true, the follower is expected to send another FetchSnapshotRequest with an Position of Position + len(Bytes) from the response.

  2. Copy the bytes in Bytes to the local snapshot file starting at file position Position.

  3. If Continue is set to false then fetching the snapshot finished. Notify the Kafka Controller that a new snapshot is available. The Kafka Controller will load this new snapshot and read records from the log after the snapshot offset.

Errors:

  1. SNAPSHOT_NOT_FOUND, OFFSET_OUT_OF_RANGE, NOT_LEADER_FOR_PARTITION - The follower can rediscover the new snapshot by send a fetch request with an offset and epoch of zero.

Metrics

TODO: figure out some useful metrics. I probably wont have until we have a fairly complete implementation.

Compatibility, Deprecation, and Migration Plan

TODO

Rejected Alternatives

Append the snapshot to the Kafka log: Instead of having a separate file for snapshots and separate RPC for downloading the snapshot, the leader could append the snapshot to the log. This design has a few drawbacks. Because the snapshot gets appended to the head of the log that means that it will be replicated to every replica in the partition. This will cause Kafka to use more bandwidth than necessary.

Use Kafka Log infrastructure to store snapshots: Kafka could use the Log functionality to persists the snapshot to disk and to facilitate the replication of the snapshot to the replicas. This solution ties the snapshot format to Kafka Log’s record format. To lower the overhead of writing and loading the snapshot by the state machine, it is important to use a format that is compatible with the state machine.

Change leader when generating snapshots: Instead of supporting concurrent snapshots and writing to the client state, we require that only non-leaders generate snapshots. If the leader wants to generate a snapshot then it would relinquish leadership. We decided to not implement this for two reasons. 1. All replicas/brokers in the cluster will have a metadata cache which will apply the replicated log and allow read-only access by other components in the Kafka Broker. 2. Even though leader election within the replicated log may be efficient, communicating this new leader to all of the brokers and all of the external clients (admin clients, consumers, producers) may have a large latency.


Just fix the tombstone GC issue with log compaction: we have also talked about just fixing the tombstone garbage collecting issue as listed in motivation instead of introducing the snapshot mechanism. The key idea is that fetchers either from follower or from consumer needs to be notified if they "may" be missing some tombstones that have deprecated some old records. Since the offset span between the deprecated record and the tombstone for the same key can be arbitrarily long — for example, we have a record with key A at offset 0, and then a tombstone record for key A appended a week later, at offset 1 billion — we'd have to take a conservative approach here. More specifically:

  • The leader would maintain an additional offset marker up to which we have deleted tombstones, i.e. all tombstones larger than this offset are still in the log. Let's call it a tombstone horizon marker. This horizon marker would be returned in each fetch response.
  • Follower replicas would not do delete its tombstones beyond the received leader's horizon marker, and would maintain it's own marker as well which should always be smaller than leader's horizon marker offset.
  • All fetchers (follower and consumer) would keep track of the horizon marker from the replica it is fetching from. The expectation is that leaders should only delete tombstones in a very lazy manner, controlled by the existing config, so that the fetching position should be larger than the horizon marker most of the time. However, if the fetcher does found its fetching position below the horizon marker, then it needs to handle it accordingly:
    • For consumer fetchers, we need to throw an extended error of InvalidOffsetException to notify the users that we may missed some tombstones, and let the users to reset is consumed record states — for example, if the caller of consumer is a streams client, and it is building a local state from the fetched records, that state need to be wiped.
    • For replica fetchers, we need to wipe the local log all together and then restart fetching from the starting offset.
    • For either types of fetchers, we will set an internal flag, let's call it unsafe-bootstrapping mode to true, while remembering the horizon marker offset at the moment from the replica it is fetching from.
      • When this flag is set to true, we would not fall back to a loop of this handling logic even when the fetching position is smaller than the horizon marker, since otherwise we would fall into a loop of rewinding.
      • However, during this period of time, if the horizon marker offsets get updated from the fetch response, meaning that some tombstones are deleted again, then we'd need to retry this unsafe-bootstrap from step one.
      • After the fetching position as advanced beyond the remembered horizon marker offset, we would reset this flag to false to go back to the normal mode.


We feel this conservative approach is equally, if not more complicated than adding a snapshot mechanism since it requires rather tricky handling logic on the fetcher side; plus it only resolves one of the motivations — the tombstone GC issue.

Another thought agains this idea is that for now most of the internal topics that would need the snapshotting mechanism (controller, txn coordinator, group coordinator) can hold their materialized cache in memory completely, and hence generating snapshots should be comparably more efficient to execute.


  • No labels