You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

 

Status

Current state"Under Discussion"

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when multiple producers can write to the same partition, the resulting state of the log is hard to predict or control: it's possible for a producer to end up writing duplicate messages to Kafka, or have multiple tasks write to the same commit log and create an inconsistent state.

This document proposes adding a simple concurrency primitive to the Kafka producer. The producer is allowed to attach an 'expected offset' to each published message — and if that offset does not match the upcoming offset at the time the message is appended to the log, the broker refuses the publish request. Like the common check-and-set operation, this 'conditional publish' lets the client confirm the current state before going ahead with the write. 

Again like check-and-set, this only makes good progress when contention for a single partition is low. Fortunately, when Kafka is used as a commit log or as a stream-processing transport, it's common to want a single producer for a given partition — and in many of these cases, the 'conditional publish' turns out to be quite useful.

  • A producer can re-send a message indefinitely and be sure that at most one of those attempts will succeed; and if two producers accidentally write to the end of the partition at once, we can be certain that at least one of them will fail.
  • It's possible to 'bulk load' Kafka this way, by appending messages consecutively to a partition. Even if the list is much larger than the buffer size or the producer has to be restarted, each message will be appended to the log just once.
  • If a process is using Kafka as a commit log -- reading from a partition to bootstrap, then writing any updates to that same partition -- it can be sure that it's seen all of the messages in that partition at the moment it does its first (successful) write.

In all of these cases, the 'conditional publish' operation can support much stronger guarantees than the existing publish semantics.

Usage Examples

This section includes a few sketches on how this feature might be used.

Output Deduplication

Suppose we have some ordered series of messages, and we want to append them to some partition. In this case, the primary goal is to avoid duplicate messages in the output.

Imagined usage: 

  • Publish the sequence of messages in order, incrementing the expected offset each time
  • If the publisher receives a mismatch error:
    • Get the high-water mark for the output partition
    • Restart publishing from the message that we originally assigned that expected offset

Any two publishers running the same code will agree on the offset for a particular message, so pipelining for the publisher is safe. If the publish succeeds for a message at offset n, this implies that the previous messages in the series have all completed successfully as well.

Commit Log

In other cases, we may want to use a Kafka partition as a 'commit log' for a key-value store — where every operation is committed to the log so it can be recovered on failure. This is a bit different than the first case, since the partition is not just an output stream, but also a point of coordination. Duplicates are also a problem here, but we also need to make sure we don't interleave the writes from two concurrent producers. (Suppose we two servers A and B that are both accepting writes, and two concurrent increment operations for key0 and key1. If A tries to increment key0 before key1, and B tries to increment key1 before key0, we don't want to end up with a commit log that holds the increment of key0 from A and then the increment of key0 from B.) In particular, this means pipelining does not seem to be safe if operations are not idempotent.

Imagined usage:

  1. Bootstrap the state until the very end of the partition. Let's say the upcoming offset is N.
  2. While we're accepting writes:
    • Accumulate a set of messages and send a publish request with expected offsets set.
    • If the publish fails, continue from step 2. Otherwise, go back to step 1.

This ensures that a) a process will only successfully publish a message at offset N if it has seen the data in the partition up to offset N.

Public Interfaces

The proposal requires a few additions to the public interface.

  • Add a new offset field to the ProduceRecord class, with a default value of -1.
  • Add a new check.expected.offsets property to the log config.
  • Add a new error code and exception type for an offset mismatch.

Proposed Changes

The current produce request already includes an offset for every message, which the server currently ignores. This proposal repurposes that field as the expected offset of the message. This requires two main changes:

  • Clients need an API to specify the expected offset of a message. This requires adding a new offset field to the existing produce record, and using the value of that field as the offset for that message in the outgoing message set. (Or the sigil value -1, if unspecified.)
  • The server needs to be modified to compare the expected offset to the upcoming offset for the matching partition, and refuse the produce request if the offsets don't match. This involves a couple additional checks at offset-assignment time, and a new error code and exception type to signal the error to the client.

This feature also requires a new config parameter, which can be enabled globally or per-topic.

The JIRA ticket includes a rough-draft patch, which should give a sense of the scope of the change.

Compatibility, Deprecation, and Migration Plan

This proposal 'hides' the offset check behind a per-log configuration flag. Without any config changes, new clients will work with the existing version of the server, and existing clients will work with the new server.

If the new configuration parameter is enabled, a user will also need to use the updated client — existing clients put arbitrary values in the offset field, so those produce requests might fail. Since the only reason to enable the config is to manually specify the offsets, which requires an up-to-date client anyways, this does not seem to be a serious limitation.

In a future major version of Kafka, it may make sense to have all clients default the offset to -1 and leave the offset check on for all partitions. However, this would be a breaking change.

Rejected Alternatives

Idempotent Producer or Transactional Messaging

A couple of designs or proposals exist for adding transactional or idempotency features to Kafka: Idempotent Producer and Transactional Messaging in Kafka. These proposals, if implemented, would likely overlap with the use-cases for this feature — for example, one can imagine using transactional messaging to implement a more reliable mirror maker.

However, this proposal is a much more conservative extension to the existing Kafka API; this makes the implementation very simple, and it imposes little to no runtime cost in memory or time. Even if these more elaborate coordination proposals are implemented, this feature would still be useful in cases where simplicity and performance are important.

Comparing Offsets by Key

A similar feature was suggested on the mailing list — instead of checking that the given offset was equal to the upcoming offset for the partition, it would check that the given offset was greater than the last offset for the message's key. This feature would have similar advantages to the current proposal, and would reduce contention in some situations: many producers could write to the same partition concurrently, as long as none of them sent messages with the same key.

Compared to the current proposal, this has a couple of downsides:

  • It requires an additional datastructure, of size proportional to the number of unique keys present in the log, which would need to be maintained and checkpointed.
  • There are other use cases where checking offsets at the partition level is important: for example, a KV store in Samza might use a partition as a commit log for a KV store... and it would want to ensure that no other task is writing to that partition, not just some particular key. (Of course, per-key or per-partition checking could be selected by a config flag.)

While this may make a good future extension, it seems simpler to exclude it from the first iteration.

  • No labels