Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka provides "at least once" delivery semantics. This means that a message that is sent may delivered one or more times. What people really want is "exactly once" semantics whereby duplicate messages are not delivered.

There are two common reasons duplicate message messages may occur:

  1. If a client attempts to send a message to the cluster and gets a network error then retrying will potentially lead to duplicates. If network error occurred before the message was delivered, no duplication will occur. However if the network error occurs after the message is appended to the log but before the response can be delivered to the sender the sender is left not knowing what has happened. The only choices are to retry and risk duplication or to give up and declare the message lost.
  2. If a consumer reads a message from a topic and then crashes then when the consumer restarts or another instances takes over consumption the new consumer will start from the last known position of the original consumer.

...

The first case currently has no good solution, however. The client doesn't know the offset of the message so it has no unique way to identify the message and check if the send succeeded.

For consumers that correctly handle duplicates, this proposal would strengthen the guarantees provided by Kafka to what is often called "atomic broadcast".

This proposal will introduce an optional set of ids that will provide a unique identifier for messages a producer sends to avoid duplicates.

Some Nuances

...

Consider a more elaborate use case which involves copying data from a source to a Kafka topic. This would be the case with Mirror Maker, for example, or any "stream processing" use case. We want it to be the case that the process doing the population can periodically save its position in the upstream topic/database and always resume from this saved position. In the event of a crash we want the copy process to be able to resume from the last known position without producing duplicates in the destination topic. To accomplish this the copy process can save BOTH its input offset/position AND the ids we will introduce associated with its downstream topic. When it restarts after a crash it will initialize with the saved ids. This will effectively make the duplicate produce requests the same as the network error retry case described above.

...

A simple, impractical implementation for deduplication would be to have the client create a unique id for each message it sends (a UUID, say) and have the server save all such ids for all messages it retains. New messages would be checked against this database and messages that existed already would be rejected. This would satisfy at least the basic requirements, but is impractical would be hopelessly inefficient as the database would contain O(num_messages) entries. A practical implementation will have to provide a similar way of detecting duplicates but with lower space requirements .and negligible lookup performance impact.

A similar but more efficient implementation An obvious mechanism for reducing the data we need to store would be to assign each producer a unique id (PID) and keep a sequence number that increments with each message sent. This pair effectively acts as a unique id, but the broker no longer needs to store all the ids to reason about what it has received from a given producer. This leverages the in-order property of Kafka (and TCP) to ensure that the broker need only keep a single "highwater mark" sequence number for each producer , which I'll call and reject any message with a lower sequence number. Specifically if H(P) . If  is the highwatermark and if the broker receives a message with PID P and sequence number S then it will accept the message iff H(P) < S.

The next question is whether the client producer will maintain a global sequence number across all messages it sends or whether it will be per topic-partition. A global number would be simpler for the client to implement. However if the sequence number was per-partition then the broker could enforce a tighter constraint, namely that H(P) + 1 = S. This would allow us to handle the pipelined request case as if any request fails we will automatically fail all other in-flight requests which will allow us to thus retain retry the full set in order.

...