Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka’s replicated log can grow unbounded. If Kafka doesn’t limit the size of these logs it can affect Kafka availability by either running out of disk or increasing the amount of time needed to load the log into Kafka in memory state. For example, both the Group Coordinator and Kafka Controller need to load the __consumer_offsets and __metadata logs respectively into memory.

In KIP-500 the Kafka Controller, which is the quorum leader from KIP-595, will materialize the entries in the metadata log into memory. Each broker in KIP-500, which will be a replica of the metadata log, will materialize the entries in the log into a metadata cache. We want to provide a stronger guarantee on the consistency of the metadata than is possible today. If all entries in the metadata log are indefinitely retained, then the design described in KIP-595 achieves a consistent log and as a result a consistent metadata cache. If Kafka doesn’t limit the size of these logs it can affect Kafka availability by:

  1. Running out of disk space
  2. Increasing the time needed to load the replicated log into memory
  3. Increasing the time it takes for new brokers to catch up to the leader's log

Kafka uses the cleanup policies compact and delete to limit the size of the log. The delete cleanup policy deletes data for a prefix of the log based on time. The delete cleanup policy is not viable when used on metadata log because the Kafka controller and metadata cache will not be able to generate a consistent view from the replicated log.

Kafka uses the cleanup policies compact and delete to limit the size of the log. The delete cleanup policy deletes data for a prefix of the log. This strategy is viable when using the replicated log to reconstruct the state machine associated with the log. The compact cleanup policy incrementally removes key/value records that are not needed to reconstruct the associated state machine. The compact policies has some issues that are address in this KIP.

Incremental Log Cleaner

Kafka implements a incremental log cleaner for topics configured for compaction. It state. During compaction the log cleaner keeps track of an OffsetMap which is a mapping from key to the latest offset in the section of the log being cleaned. The log cleaner assumes that any record with an offset less that the value in the OffsetMap given by the record's key is safe to remove. This solution will eventually result in one record for every key in log. This includes a tombstone record for key that have been deleted. Kafka removes this tombstone records after a configurable time. This can result in different logs in each replica which can result in different in-memory state after applying the log.

Type of State Machines and Operations

...

By moving snapshots to the state machine already stored in-memory in the Group Coordinator and Kafka Controller we can generate snapshot faster and more frequently.

Overview


This improvement KIP assumes that KIP-595 has been approved and implemented. With this improvement replicas will continue to replicate their log among each others. Replicas will snapshot their log independent of each other. This is consistent because the log themselves are consistent. Snapshot will be sent to replicas that attempt to fetch an offset from the leader and the leader doesn’t have that offset in the log because it is included in the snapshot.

Generating and loading the snapshot will be delegated to the state machine. That includes the Kafka Controller and Group Coordinator. The state machine Kafka Controller will notify the log Kafka Raft client when it has generated a snapshot and up to which offset is included in the snapshot. The Kafka Raft client will notify the Kafka Controller when

This improvement will be used for internal topics (__consumer_offset, __cluster_metadata and __transaction) but it should be general enough to eventually use on external topics with consumers that support this feature.

...

Append the snapshot to the Kafka log: Instead of having a separate file for snapshots and separate RPC for downloading the snapshot, the leader could append the snapshot to the log. This design has a few drawbacks. Because the snapshot gets appended to the head of the log that means that it will be replicated to every replica in the partition. This will cause Kafka to use more bandwidth than necessary.

Use Kafka Log infrastructure to use more bandwidth than necessary.Use Kafka Log infrastructure to store snapshots: Kafka could use the Log functionality to persists the snapshot to disk and to facilitate the replication of the snapshot to the replicas. This solution ties the snapshot format to Kafka Log’s record format. To lower the overhead of writing and loading the snapshot by the state machine, it is important to use a format that is compatible with the state machinestore snapshots: Kafka could use the Log functionality to persists the snapshot to disk and to facilitate the replication of the snapshot to the replicas. This solution ties the snapshot format to Kafka Log’s record format. To lower the overhead of writing and loading the snapshot by the state machine, it is important to use a format that is compatible with the state machine.

Change leader when generating snapshots: Instead of supporting concurrent snapshots and writing to the client state, we require that only non-leaders generate snapshots. If the leader wants to generate a snapshot then it would relinquish leadership. We decided to not implement this for two reasons. 1. All replicas/brokers in the cluster will have a metadata cache which will apply the replicated log and allow read-only access by other components in the Kafka Broker. 2. Even though leader election within the replicated log may be efficient, communicating this new leader to all of the brokers and all of the external clients (admin clients, consumers, producers) may have a large latency.