Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The type of in-memory state machines that we plan to implement for the Kafka Controller (see KIP-631) doesn't map very well to a key and offset based clean up policy. The data model for the Kafka Controller requires us to supports deltas/events. Currently the controller uses ZK for both storing the current state of the cluster and the deltas that it needs to apply. One example of this is topic deletion, when the user asks for a topic to be deleted 1) the Kafka Controller (AdminManager in the code) persists and commits the delete event, 2) applies the event to its in-memory state and sends RPC to all of the affected brokers, 3) persists and commits the changes to ZK. We don't believe that a key-value and offset based compaction model can be for events/deltas in a way that it is intuitive to program against.

Another operation that the Kafka Controller models as an event is the FenceBroker message from KIP-631. Every time a broker is fenced because of controlled shutdown or failure to send a heartbeat it affects every topic partition assigned to the fenced broker. For details on the replication impact of event operations (FenceBroker) vs key-value operation (IsrChange) see the section "Amount of Data Replicated".

Loading State and Frequency of Compaction

When starting a broker either because it is a new broker or it is restarting, loading the state represented by the __cluster_metadata and __consumer_offsets topic partition is required before the broker is available. By taking snapshot snapshots of the in-memory state Kafka as described below, Kafka can be much more efficient and aggressive on how often it generates the snapshot. By having snapshots in a separate file and not updating the replicated log the snapshot can include up to the high watermark. In the future this mechanism may also be useful for high-throughput topic partitions like the Group Coordinator and Transaction Coordinator.

Amount of Data Replicated

For topics like __cluster_metadata and __consumer_offsets we can assume that the set of keys rarely changes but that values change frequently. With this assumption, to generate a snapshot from the in-memory state as propose in this document the broker needs to write O(the size of the data). For log compaction the broker has to write O(the size of the data) + O(the size of tombstones).

The difference is in the read pattern for the two solutions. For in-memory snapshots every log record will be read once to update the in-memory state. The Kafka Controller and the Group Coordinator will read from the head of the log. No additional reads are required to generate a snapshot.

For log compaction there are three reads:

  1. One read to update the in-memory state.
  2. One read to generate the map of keys to offsets.
  3. One read to copy the live records from current log segments to the new log segments. These may read old offsets and may not be in the OS's file cache

If we assume an in-memory state of 100MB and a rate of change of 1MB/s, then

  1. With in-memory snapshot the broker needs to read 1MB/s from the head of the log.
  2. With log compaction the broker needs to
    1. read 1MB/s from the head of the log to update the in-memory state
    2. read 1MB/s to update the map of keys to offsets
    3. read 3MB/s (100MB from the already compacted log, 50MB from the new key-value records) from the older segments. The log will accumulate 50MB in 50 seconds worth of changes before compacting because the default configuration has a minimum clean ration of 50%.

We mentioned __consumer_offsets in this section because in the future we are interested in using this mechanism for high-throughput topic partitions like the Group Coordinator and Transaction Coordinator.

Amount of Data Replicated

Because the Kafka Controller as described in KIP-631 will use events/deltas to represent some of its in-memory state changes the amount of data that needs to be written to the log and replicated to all of the brokers can be reduced. As an example if a broker goes offline and we assume that we have 100K topics each with 10 partitions with a replica factor of 3, 100 brokers. In a well balanced cluster we can expect that no broker will have 2 or more partitions for the same topic. In this case the best Because the Kafka Controller as described in KIP-631 will use events/deltas to represent some of its in-memory state changes the amount of data that needs to be written to the log and replicated to all of the brokers can be reduced. As an example if a broker goes offline and we assume that we have 100K topics each with 10 partitions with a replica factor of 3, 100 brokers. In a well balanced cluster we can expect that no broker will have 2 or more partitions for the same topic. In this case the best we can do for IsrChange message and DeleteBroker FenceBroker message are the following message sizes:

  1. IsrChange has a size of 40 bytes. 4 bytes for partition id, 16 bytes for topic id, 8 bytes for ISR ids, 4 bytes for leader id, 4 bytes for leader epoch, 2 bytes for api key and 2 bytes for api version.
  2. DeleteBroker FenceBroker has a size of 8 bytes. 4 bytes for broker id, 2 bytes for api key and 2 bytes for api version.

...

When using events or deltas (DeleteBrokerFenceBroker) each broker needs to persist 8 bytes in the replicated log and 1.14 MB in the snapshot. The Active Controller will need to replica 8 bytes to each broker in the cluster (99 brokers) or .77 KB. Note that each broker still needs to persist 1.14 MB - the difference is that those 1.14 MB are not sent through the networkthe replicated log and 1.14 MB in the snapshot. The Active Controller will need to replica 8 bytes to each broker in the cluster (99 brokers) or .77 KB. Note that each broker still needs to persist 1.14 MB - the difference is that those 1.14 MB are not sent through the network.

Consistent Log and Tombstones

It is very important that the log remain consistent across all of the replicas after log compaction. The current implementation of log compaction in Kafka can lead to divergent logs because it is possible for slow replicas to miss the tombstone records. Tombstone records are deleted after a timeout and are not guarantee to have been replicated to all of the replicas before they are deleted. It should be possible to extend log compaction and the fetch protocol as explained in the Rejected Alternatives "Just fix the tombstone GC issue with log compaction". We believed that this conservative approach is equally, if not more complicated than snapshots as explain in this proposal and more importantly it doesn't address the other items in the motivation section above.

Overview

This KIP assumes that KIP-595 has been approved and implemented. With this improvement replicas will continue to replicate their log among each others. Replicas will snapshot their log independent of each other. Snapshots will be consistent because the logs are consistent across replicas. Follower and observer replicas fetch the snapshots from the leader when they attempt to fetch an offset from the leader and the leader doesn’t have that offset in the log.

...

This KIP focuses on the changes required for use by the Kafka Controller's __cluster_metadata topic partition but we should be able to extend it to use it in other internal topic partitions like __consumer_offsetoffsets and __transaction.

Proposed Changes

...

We feel this conservative approach is equally, if not more complicated than adding a snapshot mechanism since it requires rather tricky handling logic on the fetcher side; plus it only resolves one of the motivations — the tombstone garbage collection issue.

Another thought againts against this idea is that for now most of the internal topics (controller, transaction coordinator, group coordinator) that would need the snapshot mechanism describe in this KIP can hold their materialized cache in memory completely, and hence generating snapshots should be comparably more efficient to execute.

...