Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when multiple producers can write to the same partitionWhen using the current Producer APIs, the resulting state of the log is hard to predict or control: it's possible for a producer to end up writing duplicate messages to Kafka, or have multiple competing tasks write to the same commit log and create an inconsistent state.

...

In all of these cases, the 'conditional publish' operation can support much stronger guarantees than the existing publish semantics.

Usage Examples

This section includes a few sketches on how this feature might be used.

Output Deduplication

Suppose we have some ordered series of messages, and we want to append them to some partition. In this case, the primary goal is to avoid duplicate messages in the output.

Imagined usage: 

  • Publish the sequence of messages in order, incrementing the expected offset each time
  • If the publisher receives a mismatch error:
    • Get the high-water mark for the output partition
    • Restart publishing from the message that we originally assigned that expected offset

Any two publishers running the same code will agree on the offset for a particular message, so pipelining for the publisher is safe. If the publish succeeds for a message at offset n, this implies that the previous messages in the series have all completed successfully as well.

Commit Log

In other cases, we may want to use a Kafka partition as a 'commit log' for a key-value store — where every operation is committed to the log so it can be recovered on failure. This is a bit different than the first case, since the partition is not just an output stream, but also a point of coordination. Duplicates are also a problem here, but we also need to make sure we don't interleave the writes from two concurrent producers. (Suppose we two servers A and B that are both accepting writes, and two concurrent increment operations for key0 and key1. If A tries to increment key0 before key1, and B tries to increment key1 before key0, we don't want to end up with a commit log that holds the increment of key0 from A and then the increment of key0 from B.) In particular, this means pipelining does not seem to be safe if operations are not idempotent.

Imagined usage:

  1. Bootstrap the state until the very end of the partition. Let's say the upcoming offset is N.
  2. While we're accepting writes:
    • Accumulate a set of messages and send a publish request with expected offsets set.
    • If the publish fails, continue from step 2. Otherwise, go back to step 1.

This ensures that a) a process will only successfully publish a message at offset N if it has seen the data in the partition up to offset N.

Public Interfaces

The proposal requires a few additions to the public interface.

...

  • Clients need an API to specify the expected offset of a message. This requires adding a new offset field to the existing produce record, and using the value of that field as the offset for that message in the outgoing message set. (Or the sigil value -1, if unspecified.)
  • The server needs to be modified to compare the expected offset to the upcoming offset for the matching partition, and refuse the entire produce request if any of the offsets don't match. This involves a couple additional checks at offset-assignment time, and a new error code and exception type to signal the error to the client.

...

The JIRA ticket includes a rough-draft patch, which should give a sense of the scope of the change.

Worked Example

To see how this low-level coordination mechanism can be used by high-level code, we'll look at the design of a simple distributed key-value store.

Our store distributes data by partitioning the keyspace into multiple shards, such that every key belongs to a single shard. The database also maintains a 'leader' for each shard which is responsible for maintaining all of that shard's data. (This behaves like the Kafka consumer's rebalancer — normally we expect a single leader for each shard, but we might transiently have multiple 'leaders' during a partition or leadership transition.) The database uses a Kafka topic as a write-ahead log for all operations, with one partition per shard; the logged operations are used for both crash recovery and replication. We'll assume that the result of an operation (or even whether or not that operation is permitted at all) might depend on all the previous operations in that shard. As we append to the log, then, it's important that the operations are not duplicated, skipped, or reordered; if two 'leaders' try to append at once, it's also important that we don't interleave their operations in the log.

Followers and 'bootstrapping' leaders simply consume the partition, applying each operation to their local state and tracking the upcoming offset. Once a leader believes it has reached the end of the partition, it will start accepting new operations from clients and appending them to the partition. It batches multiple operations to Kafka in a single produce request, with a maximum of one in-flight request at a time. Each record is assigned an expected offset, starting with the leader's upcoming offset. If the produce request is successful: the leader can apply the operations to its persistent state, return successful responses to all the clients, set the new upcoming offset, and send the next batch of records to Kafka. If the produce request fails, maybe after a few retries: the leader should fail the corresponding requests and resume bootstrapping from what it had thought was the upcoming offset, perhaps checking to see if it is still the leader.

This append logic preserves all the correctness properties we required above. Suppose our leader is bootstrapped up to an upcoming offset of k, and publishes a new batch of operations to the log. If k doesn't match the actual upcoming offset in the partition (perhaps a new leader has been elected and appended first, or the broker returned stale metadata as in KAFKA-2334) the publish will have no effect. If the offsets do match, the server will try and append the messages to the log. Let's imagine the log contains messages [A,B,C], and the leader published messages [D,E,F]. The append might succeed, resulting in a log of [A,B,C,D,E,F]; it might fail, leaving the original log of [A,B,C]; or it might fail while replicating and keep only a prefix of the operations, leaving a log of [A,B,C,D]. In no case will we ever skip over messages in the batch, or change the order, so the result will always be a legal series of operations. Finally: since we wait to acknowledge updates until after they've been appended to the log, acknowledged operations should not be lost.

Compatibility, Deprecation, and Migration Plan

...