Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Brokers can directly update ZK for shrinking / expanding ISR; this will be replaced with AlterISR request sent from leaders to the controller (KIP-497: Add inter-broker API to alter ISR). The controller would then update the metadata by appending a batch of entries to its metadata log where each topic-partition represents one entry.
  • Admin requests for reassign replicas will be replaced with an AlterPartitionAssignments request to the controller (KIP-455: Create an Administrative API for Replica Reassignment). The controller would update the metadata by appending a batch of entries to its metadata log where each topic-partition represents one entry.
  • Existing admin request for config changes etc will be translated to a batch of updates to its metadata log.

...

Log Compaction and Snapshots

The metadata log described in this KIP can grow unbounded. Managing the size of the replicated log is describe in KIP-630: Kafka Raft Snapshot. Each broker in KIP-500 will be an observer of the metadata log and will materialize the entries into a cache of some kind. We want to New and slow brokers will catch up to the leader by 1) fetching the snapshot and 2) fetching the replicated log after the snapshot. This will provide a stronger guarantee on the consistency of the metadata than is possible today. Let's refer to a materialized table of the log entries on some observer O up to a given offset N as Snapshot(O, N). The offset in this case can be considered as the version of the metadata. This version could in the future be used by the client to detect stale metadata so we want to be sure that it can be maintained consistently. In particular, if two observers and expose metadata up to N, then we want to guarantee that Snapshot(K, N) == Snapshot(J, N)

If all entries in the metadata log are indefinitely retained, then this is trivially achieved. However, in practice, we have to compact the log in order to keep it from growing too large. This proposal relies on Kafka's existing log cleaner for log compaction. Other approaches involve maintaining snapshots, but the benefits of using the log cleaner are 1) the logic already exists and is well-understood, and 2) it simplifies the fetch protocol since we do not need a different mechanism to fetch snapshots.

The diagram below presents a brief overview of how compaction works in Kafka today. The cleaner maintains an offset known as the "first dirty offset." On a given round of cleaning, the cleaner will scan forward starting from the dirty offset and build a table of the key-value pairs until either the first uncleanable offset is reached or the table grows too large. The end offset of this scanning becomes the next dirty offset once this round of cleaning completes. After building this table, the cleaner scans from the beginning of the log and builds a new log which consists of all the entries which are not present in the table. Once the new log is ready, it is atomically swapped with the current log.

As an example, consider the following log. The first dirty offset is 9. Suppose that the cleaner is able to scan to the end of the log when building the table of retained entries.

Image Removed

Following cleaning, the first dirty offset is advanced to offset 9. We are able to remove the entries at offsets 0, 1, 4, and 5.

Image Removed

In order to address the "consistent versioning" problem mentioned above, an observer needs to be able to tell when it has reached an offset such that the materialized snapshot at that offset is guaranteed to be consistent among all replicas of the log. The challenge is that an observer which is fetching the log from the leader does not know which portion of the log has already been cleaned. For example, using the diagram above, if we attempt to materialize the state after only reading up to offset 6, then our snapshot will not contain keys k1 and k2 even though they would have been present in the log at offset 6 if the entries at offset 0 and 1 had not been cleaned.

Our solution to address this problem is simple. We require the leader to indicate its current dirty offset in each Fetch response. A follower/observer will know if its current snapshot represents a consistent version if and only if its local log end offset after appending the records from the response is greater than or equal to the dirty offset received from the leader.

This is outside the scope of this proposal, but we suggest that this approach to versioning may be useful for managing metadata consistency on the clients. Each metadata response can indicate the corresponding version of the metadata log so that clients have an easy way to detect stale metadata. 

Quorum Performance

The goal for Raft quorum is to replace Zookeeper dependency and reach higher performance for metadata operations. In the first version, we will be building necessary metrics to monitor the end-to-end latency from admin request (AlterPartitionReassignments) and client request being accepted to being committed. We shall monitor the time spent on local, primarily the time to fsync the new records and time to apply changes to the state machine, which may not be really a trivial operation. Besides we shall also monitor the time used to propagate change on the remote, I.E. latency to advance the high watermark. Benchmarks will also be built to compare the efficiency for a 3-node broker cluster using Zookeeper vs Raft, under heavy load of metadata changes. We shall also be exploring existing distributed consensus system load frameworks at the same time, but this may beyond the scope of KIP-595. 

...