Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In KIP-500 the Kafka Controller, which is the quorum leader from KIP-595, will materialize the entries in the metadata log into memory. Each broker in KIP-500, which will be a replica of the metadata log, will materialize the entries in the log into a metadata cache. We want to provide a stronger guarantee on the consistency of the metadata than is possible today. If all entries in the metadata log are indefinitely retained, then the design described in KIP-595 achieves a consistent log and as a result a consistent metadata cache. If Kafka doesn’t limit the size of these logs this replicated log it can affect Kafka availability by:

  1. Running out of disk space
  2. Increasing the time needed to load the replicated log into memory
  3. Increasing the time it takes for new brokers to catch up to the leader's log and hence join the cluster.

Kafka uses the cleanup policies compact and delete delete to limit the size of the log. The delete cleanup policy deletes data for a prefix of the log based on time. The delete cleanup policy is not viable when used on metadata log because the Kafka controller and metadata cache will not be able to generate a consistent view from the replicated such a log.

The compact cleanup policy incrementally removes key/value records that are not needed to reconstruct the associated key/value state. During compaction the log cleaner keeps track of an OffsetMap which is a mapping from key to the latest offset in the section of the log being cleaned. The log cleaner assumes that any record with an offset less that the value in the OffsetMap given by the record's key is safe to remove. This solution will eventually result in one record for every key in log. This includes a tombstone record for key that have been deleted. Kafka removes this tombstone records after a configurable time. This can result in different logs in each replica which can result in different in-memory state after applying the log. We explore changes to the compact cleanup policy in the "Rejected Alternatives" section.

Type of State Machines and Operations

This solution assumes that operation for the state machine can be compacted without having to inspect the value of the state machine at that offset.

For example, how do you compact the operations like key <- previous value + 1 without knowing the value of the key at this offset?

Tombstone and Delete Operations

For compacted topics, Kafka allows for the deletion of keys in the state machine by appending a record with a key and with a value of null (e.g. key ← null). These tombstone records (X) must be kept in the log as long as there is another replica that contains a record (Y) for the same key key and that replica doesn’t contain record X. In other words, Y.offset < Follower.logEndOffset < X.offset.

Frequency of Compaction

The type of in-memory state machines what we plan to implement for the Kafka Controller (see KIP-631) doesn't map very well to an key and offset based clean up policy. The data model for the Kafka Controller requires us to supports deltas/events. Currently the controller uses ZK for both storing the current state of the cluster and the deltas that it needs to apply. One example of this is topic deletion, when the user asks for a topic to be deleted 1) the Kafka Controller (AdminManager in the code) persists and commits the delete event, 2) applies the event to its in-memory state and sends RPC to all of the affected brokers, 3) persists and commits the changes to ZK. We don't believe that a key-value and offset based compaction model can be for events/deltas in a way that it is intuitive to program against.

Loading State and Frequency of Compaction

When starting a broker either because it is a new broker, a broker was upgraded or a failed broker is restarting. Loading the state represented by the __metadata topic partition is required before the broker is available. With snapshot based of the in-memory state Kafka can be much more aggressive on how often it generates the snapshot and the snapshot can include up to the high watermark. In the future this mechanism will also be useful for high-throughput topic partitions like the Group Coordinator and Transaction Coordinator.

Amount of Data Replicated

Because the Kafka Controller as described in KIP-631 will use events/deltas to represent some of it in-memory state changes the amount of data that needs to be persistent and replicated to all of the brokers can be reduced. As an example if a broker goes offline and we assume that we have 1 million topic partitions, 10 brokers, an IsrChange message of 40 bytes (4 bytes for partition id, 16 bytes for topic id, 8 bytes for ISR ids, 4 bytes for leader id, 4 bytes for leader epoch, 2 bytes for api key and 2 bytes for api version) and a DeleteBroker message of 8 bytes (4 bytes for broker id, 2 bytes for api key and 2 bytes for api version).

If events/deltas are not allowed in the replicated log for the __metadata topic partition then the Kafka Controller and each broker needs to persist 40 bytes per topic partitions hosted by the broker. In the example above that comes out to 3.81 MB. The Kafka Controller will need to replicate 3.81 MB to each broker in the cluster (10) or 38.14 MBBy moving snapshots to the state machine already stored in-memory in the Group Coordinator and Kafka Controller we can generate snapshot faster and more frequently.

Overview


This KIP assumes that KIP-595 has been approved and implemented. With this improvement replicas will continue to replicate their log among each others. Replicas will snapshot their log independent of each other. This is consistent because the log themselves are consistent. Snapshot will be sent to replicas that attempt to fetch an offset from the leader and the leader doesn’t have that offset in the log because it is included in the snapshot.

...