Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In KIP-500 the Kafka Active Controller, which is the quorum leader from KIP-595, will materialize the entries in the metadata log into memory. Each broker in KIP-500, which will be a voter or observer replica of the metadata log, will materialize the entries in the log into a metadata cache. We want to provide stronger guarantee on the consistency of the metadata than is possible today. If all entries in the metadata log are indefinitely retained, then the design described in KIP-595 achieves a consistent log and as a result a consistent metadata cache. If Kafka doesn’t limit the size of this replicated log it can affect Kafka availability by:

...

The type of in-memory state machines what that we plan to implement for the Kafka Controller (see KIP-631) doesn't map very well to an a key and offset based clean up policy. The data model for the Kafka Controller requires us to supports deltas/events. Currently the controller uses ZK for both storing the current state of the cluster and the deltas that it needs to apply. One example of this is topic deletion, when the user asks for a topic to be deleted 1) the Kafka Controller (AdminManager in the code) persists and commits the delete event, 2) applies the event to its in-memory state and sends RPC to all of the affected brokers, 3) persists and commits the changes to ZK. We don't believe that a key-value and offset based compaction model can be for events/deltas in a way that it is intuitive to program against.

...

When starting a broker either because it is a new broker, a broker was upgraded or a failed broker it is restarting. Loading , loading the state represented by the __cluster_metadata topic partition is required before the broker is available. With By taking snapshot based of the in-memory state Kafka as described below Kafka can be much more aggressive on how often it generates the snapshot. By having snapshots in a separate file and not updating the replicated log the snapshot can include up to the high watermark. In the future this mechanism will may also be useful for high-throughput topic partitions like the Group Coordinator and Transaction Coordinator.

...

Because the Kafka Controller as described in KIP-631 will use events/deltas to represent some of it in-memory state changes the amount of data that needs to be persistent and replicated to all of the brokers can be reduced. As an example if a broker goes offline and we assume that we have 1 million topic partitions , 10 with a replica factor of 3, 100 brokers, an IsrChange message of 40 bytes (4 bytes for partition id, 16 bytes for topic id, 8 bytes for ISR ids, 4 bytes for leader id, 4 bytes for leader epoch, 2 bytes for api key and 2 bytes for api version) and a DeleteBroker message of 8 bytes (4 bytes for broker id, 2 bytes for api key and 2 bytes for api version).

If When a broker is shutdown and if events/deltas (DeleteBroker) are not allowed in the replicated log for the __cluster_metadata topic partition then the Kafka Controller and each broker needs to persist 40 bytes per topic partitions hosted by the broker. In the example above that comes out to 3.81 MB. The Kafka Controller will need to replicate 3.81 MB to Each broker will be a replica for 30,000 partitions, that is  1.14 MB that needs to get persisted in the replicated log by each broker. The Active Controller will need to replicate 1.14 MB to each broker in the cluster (99 brokers) or 113.29 MB.

When using events or deltas (DeleteBroker) each broker needs to persist 8 bytes in the replicated log and 1.14 MB in the snapshot. The Active Controller will need to replica 8 bytes to each broker in the cluster (10) or 3899 brokers) or .77 KB. Note that each broker still needs to persist 1.14 MB - the difference is that those 1.14 MB are not sent through the network.

Overview

This KIP assumes that KIP-595 has been approved and implemented. With this improvement replicas will continue to replicate their log among each others. Replicas will snapshot their log independent of each other. Snapshots will be consistent because the logs are consistent across replicas. Follower and observer replicas fetch the snapshots from the leader when they attempt to fetch an offset from the leader and the leader doesn’t have that offset in the log because it is included in the snapshot.

Generating and loading the snapshot will be delegated to the Kafka Controller. The Kafka Controller will notify the Kafka Raft client layer when it has generated a snapshot and up to which offset is included in the snapshot. The Kafka Raft client layer will notify the Kafka Controller when a new snapshot has been fetched from the leaderActive Controller. See section "Interaction Between Kafka Raft and the Kafka Controller" and KIP 595 for more details.

This KIP focuses on the changes required for use by the Kafka Controller's __cluster_metadata topic partition but we should be able to extend it to use it in other internal topic partitions like __consumer_offset and __transaction.

...

The Kafka Controller will notify the Kafka Raft client when it has finished generating a new snapshots. It is safe for the log to truncate a prefix of the log up to the latest snapshot. The __cluster_metadata topic partition will have the latest snapshot and zero or more older snapshots. Having multiple snapshots is useful for minimizing re-fetching of the snapshot and will be described later in the documentsnapshot when a new snapshot is generated. These additional snapshots would have to be deleted, a process describe later in the documentthis is described in "When to Delete Snapshots"

Design

Visually a Kafka Raft Log would look as follow:

...

  1. LEO - log end offset - the largest offset and epoch that has been written to disk.

  2. high-watermark - the largest offset and epoch that has been replicated to N/2 + 1 replicas.

  3. LBO - log begin offset - the smallest offset in the replicated log. This property used to be called log start offset. It is renamed such that the acronym doesn’t conflict with last stable offset (LSO).

In the example above, if , offset=x, epoch=a does not appear in the replicated log because it is before the LBO (x + 1, b). If the Kafka topic partition leader receives a fetch request with an offset and epoch greater than or equal to the log begin offset LBO (x + 1, a) then the leader handles the request as describe in KIP-595. Otherwise, the leader will respond with the offset and epoch of the latest snapshot (y, c) and with the next fetch offset and epoch (y + 1, d).

...

To minimize the need to send snapshots to slow replicas, the leader can delay increasing the log begin offset up to the minimum offset that it knows it has been replicate to all of the live replicas. Followers and observers will increase their log begin offset to the value sent on the fetch response as long as the local state machine has generated a snapshot that includes to the log begin offset minus one.LBO can be increase/set delay increasing the log begin offset up to the minimum offset that it knows it has been replicate to all of the live replicas. The leader's LBO can be increased to an offset X if the following is true:

  1. There is a snapshot for the topic partition which includes the offset for X - 1 and.

  2. One of the following is true:

    1. All of the live replicas (followers and observers) have replicated LBO or

    2. LBO is max.replication.lag.ms old.

Followers and observers will increase their log begin offset to the value sent on the fetch response as long as the local state machine has generated a snapshot that includes to the log begin offset minus one.

Kafka allows the clients to delete records that are less than a given offset by using the DeleteRecords RPC . Those requests will be validated using the same logic enumerated above.

...

The broker will delete any snapshot with a latest offset and epoch (SnapshotOffsetAndEpoch) less than the LBO - 1 (log begin offset). The means that the question of when to delete a snapshot is the same as when to increase the LBO which is covered the the section “When To Increase the Log Begin Offset”.

...

There are two cases when the Kafka Controller needs to load the snapshot:

  1. When the broker it is booting.

  2. When the follower and observer replicas finishes fetches fetching a new snapshot from the leader.

...

Handling Fetch Request

The replica’s leader’s handling of fetch request will be extended such that if FetchOffset is less than LogBeginOffset then the leader will respond with a SnapshotOffsetAndEpoch of the latest snapshot and a NextOffsetAndEpoch of the offset right after SnapshotOffsetAndEpoch.

...

The replica's handling of fetch response will be extended such that if SnapshotOffsetAndEpoch is set then the follower will truncate delete its entire log and snapshots, and start fetching from the leader both the log and the snapshot. Where to fetch in the Log is describe by NextOffsetAndEpoch while which snapshot to fetch is described by SnapshotOffsetAndEpoch.

...

  1. SNAPSHOT_NOT_FOUND, OFFSET_OUT_OF_RANGE, NOT_LEADER_FOR_PARTITION - The follower can rediscover the new snapshot by sending a fetch request with an offset and epoch of zero.

Metrics

Full NameDescription
kafka.controller:type=KafkaController,name=GenSnapshotLatencyMsA histogram of the amount of time it took to generate a snapshot.
kafka.controller:type=KafkaController,name=LoadSnapshotLatencyMsA histogram of the amount of time it took to load the snapshot.
kafka.controller:type=KafkaController,name=SnapshotSizeBytesSize of the latest snapshot in bytes.

Compatibility, Deprecation, and Migration Plan

...