Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state"Under Discussion"

Discussion thread: ??? here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when multiple producers can write to the same partitionWhen using the current Producer APIs, the resulting state of the log is hard to predict or control: it's possible for a producer to end up writing duplicate messages to Kafka, or have multiple competing tasks write to the same commit log and create an inconsistent state.

...

Again like check-and-set, this only makes good progress when contention for a single partition is low. Fortunately, when Kafka is used as a commit log or as a stream-processing transport, it's common to have just one producer (or a small number) want a single producer for a given partition — and in many of these cases, the 'conditional publish' turns out to be quite useful.

...

  • Clients need an API to specify the expected offset of a message. This requires adding a new offset field to the existing produce record, and using the value of that field as the offset for that message in the outgoing message set. (Or the sigil value -1, if unspecified.)
  • The server needs to be modified to compare the expected offset to the upcoming offset for the matching partition, and refuse the entire produce request if any of the offsets don't match. This involves a couple additional checks at offset-assignment time, and a new error code and exception type to signal the error to the client.

...

The JIRA ticket includes a rough-draft patch, which should give a sense of the scope of the change.

Worked Example

To see how this low-level coordination mechanism can be used by high-level code, we'll look at the design of a simple distributed key-value store.

Our store distributes data by partitioning the keyspace into multiple shards, such that every key belongs to a single shard. The database also maintains a 'leader' for each shard which is responsible for maintaining all of that shard's data. (This behaves like the Kafka consumer's rebalancer — normally we expect a single leader for each shard, but we might transiently have multiple 'leaders' during a partition or leadership transition.) The database uses a Kafka topic as a write-ahead log for all operations, with one partition per shard; the logged operations are used for both crash recovery and replication. We'll assume that the result of an operation (or even whether or not that operation is permitted at all) might depend on all the previous operations in that shard. As we append to the log, then, it's important that the operations are not duplicated, skipped, or reordered; if two 'leaders' try to append at once, it's also important that we don't interleave their operations in the log.

Followers and 'bootstrapping' leaders simply consume the partition, applying each operation to their local state and tracking the upcoming offset. Once a leader believes it has reached the end of the partition, it will start accepting new operations from clients and appending them to the partition. It batches multiple operations to Kafka in a single produce request, with a maximum of one in-flight request at a time. Each record is assigned an expected offset, starting with the leader's upcoming offset. If the produce request is successful: the leader can apply the operations to its persistent state, return successful responses to all the clients, set the new upcoming offset, and send the next batch of records to Kafka. If the produce request fails, maybe after a few retries: the leader should fail the corresponding requests and resume bootstrapping from what it had thought was the upcoming offset, perhaps checking to see if it is still the leader.

This append logic preserves all the correctness properties we required above. Suppose our leader is bootstrapped up to an upcoming offset of k, and publishes a new batch of operations to the log. If k doesn't match the actual upcoming offset in the partition (perhaps a new leader has been elected and appended first, or the broker returned stale metadata as in KAFKA-2334) the publish will have no effect. If the offsets do match, the server will try and append the messages to the log. Let's imagine the log contains messages [A,B,C], and the leader published messages [D,E,F]. The append might succeed, resulting in a log of [A,B,C,D,E,F]; it might fail, leaving the original log of [A,B,C]; or it might fail while replicating and keep only a prefix of the operations, leaving a log of [A,B,C,D]. In no case will we ever skip over messages in the batch, or change the order, so the result will always be a legal series of operations. Finally: since we wait to acknowledge updates until after they've been appended to the log, acknowledged operations should not be lost.

Compatibility, Deprecation, and Migration Plan

...