Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add a complete example

...

In all of these cases, the 'conditional publish' operation can support much stronger guarantees than the existing publish semantics.

Usage Examples

This section includes a few sketches on how this feature might be used.

Output Deduplication

Suppose we have some ordered series of messages, and we want to append them to some partition. In this case, the primary goal is to avoid duplicate messages in the output.

Imagined usage: 

  • Publish the sequence of messages in order, incrementing the expected offset each time
  • If the publisher receives a mismatch error:
    • Get the high-water mark for the output partition
    • Restart publishing from the message that we originally assigned that expected offset

Any two publishers running the same code will agree on the offset for a particular message, so pipelining for the publisher is safe. If the publish succeeds for a message at offset n, this implies that the previous messages in the series have all completed successfully too.

Commit Log

Suppose we want to use a Kafka partition as a 'commit log' for some task's state — where every operation that changes the state is committed to the log, so it can be replayed on failure. Here, the partition is not just an output stream, but also a point of coordination. In particular: if we (temporarily) have duplicate instances of the task writing to the same partition, it's important to avoid interleaving messages from the two producers. (If not, we might end up with an invalid series of operations in the log and be unable to recover our task's state.)

Imagined usage:

  1. Bootstrap the state until the very end of the partition. Let's say the upcoming offset is N.
  2. While we're accepting writes:
    • Accumulate a set of messages and send a publish request with expected offsets set.
    • If the publish fails, continue from step 2. Otherwise, go back to step 1.

This ensures that a) a process will only successfully publish a message at offset N if it has seen the data in the partition up to offset N.

Public Interfaces

The proposal requires a few additions to the public interface.

...

  • Clients need an API to specify the expected offset of a message. This requires adding a new offset field to the existing produce record, and using the value of that field as the offset for that message in the outgoing message set. (Or the sigil value -1, if unspecified.)
  • The server needs to be modified to compare the expected offset to the upcoming offset for the matching partition, and refuse the entire produce request if any of the offsets don't match. This involves a couple additional checks at offset-assignment time, and a new error code and exception type to signal the error to the client.

...

The JIRA ticket includes a rough-draft patch, which should give a sense of the scope of the change.

Worked Example

To see how this low-level coordination mechanism can be used by high-level code, we'll look at the design of a simple distributed key-value store.

Our store distributes data by partitioning the keyspace into multiple shards, such that every key belongs to a single shard. The database also maintains a 'leader' for each shard which is responsible for maintaining all of that shard's data. (Normally, there should be one leader for a shard at a time.) The database uses a Kafka topic as a write-ahead log for all operations, with one partition per shard; the logged operations are used for both crash recovery and replication. We'll assume that, to preserve an accurate history, it's important that operations are not duplicated, dropped, or reordered; if two nodes try to append at once — say, during a leader transition — it's also important that we don't interleave their operations in the log.

Followers and 'bootstrapping' leaders simply consume the partition, applying each operation to their local state and tracking the upcoming offset. Once a leader believes it's reached the end of the partition, it will start accepting new operations from clients and appending them to the partition. It batches multiple operations to Kafka in a single produce request, with a maximum of one in-flight request at a time. Each record is assigned an expected offset, starting with (what the leader believes is) the current upcoming offset. If the produce request is successful: the leader can apply the operations to its local state, return successful responses to all the clients, set the new upcoming offset, and send the next batch of records to Kafka. If the produce request fails, maybe after a few retries: the leader should fail the corresponding requests and resume bootstrapping from what it had thought was the upcoming offset, perhaps checking to see if it is still the leader.

This logic ensures that the log always contains a valid series of operations. Inductively: a leader will only try to send a batch starting with expected offset n if it's seen all the operations leading up to n, and sending a valid batch of messages will never make the log invalid. (If the message offsets doesn't match the actual offsets, the produce request has no effect; if they do match, Kafka's existing semantics guarantee that a (potentially-empty) prefix of those messages will be appended to the partition... and the prefix of a valid series of operations is always valid.) Individual produce requests are idempotent when expected offsets are set, so retrying will never cause duplicates. We also know that if the upcoming offset is larger than the leader expected (either because a new leader has been elected, or because of an issue like KAFKA-2334) our produce will be refused, so we'll never corrupt the log by writing commands based on outdated state.

Compatibility, Deprecation, and Migration Plan

...