Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Our store distributes data by partitioning the keyspace into multiple shards, such that every key belongs to a single shard. The database also maintains a 'leader' for each shard which is responsible for maintaining all of that shard's data. (Normally, there should be one leader for a shard at a time.) The This behaves like the Kafka consumer's rebalancer — normally we expect a single leader for each shard, but we might transiently have multiple 'leaders' during a partition or leadership transition.) The database uses a Kafka topic as a write-ahead log for all operations, with one partition per shard; the logged operations are used for both crash recovery and replication. We'll assume that , to preserve an accurate historythe result of an operation (or even whether or not that operation is permitted at all) might depend on all the previous operations in that shard. As we append to the log, then, it's important that the operations are not duplicated, droppedskipped, or reordered; if two nodes 'leaders' try to append at once — say, during a leader transition — it, it's also important that we don't interleave their operations in the log.

Followers and 'bootstrapping' leaders simply consume the partition, applying each operation to their local state and tracking the upcoming offset. Once a leader believes it 's has reached the end of the partition, it will start accepting new operations from clients and appending them to the partition. It batches multiple operations to Kafka in a single produce request, with a maximum of one in-flight request at a time. Each record is assigned an expected offset, starting with (what the leader believes is) the current 's upcoming offset. If the produce request is successful: the leader can apply the operations to its local persistent state, return  return successful responses to all the clients, set the new upcoming offset, and send the next batch of records to Kafka. If the produce request fails, maybe after a few retries: the leader should fail the corresponding requests and resume bootstrapping from what it had thought was the upcoming offset, perhaps checking to see if it is still the leader.

This append logic ensures that the log always contains a valid series of operations. Inductively: a leader will only try to send a batch starting with expected offset n if it's seen all the operations leading up to n, and sending a valid batch of messages will never make the log invalid. (If the message offsets preserves all the correctness properties we required above. Suppose our leader is bootstrapped up to an upcoming offset of k, and publishes a new batch of operations to the log. If k doesn't match the actual offsets, the produce request has no effect; if they do match, Kafka's existing semantics guarantee that a (potentially-empty) prefix of those messages will be appended to the partition... and the prefix of a valid series of operations is always valid.) Individual produce requests are idempotent when expected offsets are set, so retrying will never cause duplicates. We also know that if the upcoming offset is larger than the leader expected (either because a new leader has been elected, or because of an issue like KAFKA-2334) our produce will be refused, so we'll never corrupt the log by writing commands based on outdated stateupcoming offset in the partition (perhaps a new leader has been elected and appended first, or the broker returned stale metadata as in KAFKA-2334) the publish will have no effect. If the offsets do match, the server will try and append the messages to the log. Let's imagine the log contains messages [A,B,C], and the leader published messages [D,E,F]. The append might succeed, resulting in a log of [A,B,C,D,E,F]; it might fail, leaving the original log of [A,B,C]; or it might fail while replicating and keep only a prefix of the operations, leaving a log of [A,B,C,D]. In no case will we ever skip over messages in the batch, or change the order, so the result will always be a legal series of operations. Finally: since we wait to acknowledge updates until after they've been appended to the log, acknowledged operations should not be lost.

Compatibility, Deprecation, and Migration Plan

...