Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Kafka provides "at least once" delivery semantics. This means that a message that is sent may delivered one or more times. What people really want is "exactly once" semantics whereby duplicate messages are not delivered.

There are two common reasons duplicate message messages may occur:

  1. If a client attempts to send a message to the cluster and gets a network error then retrying will potentially lead to duplicates. If network error occurred before the message was delivered, no duplication will occur. However if the network error occurs after the message is appended to the log but before the response can be delivered to the sender the sender is left not knowing what has happened. The only choices are to retry and risk duplication or to give up and declare the message lost.
  2. If a consumer reads a message from a topic and then crashes then when the consumer restarts or another instances takes over consumption the new consumer will start from the last known position of the original consumer.

...

The first case currently has no good solution, however. The client doesn't know the offset of the message so it has no unique way to identify the message and check if the send succeeded.

For consumers that correctly handle duplicates, this proposal would strengthen the guarantees provided by Kafka to what is often called "atomic broadcast".

This proposal will introduce an optional set of ids that will provide a unique identifier for messages a producer sends to avoid duplicates.

Some Nuances

...

Consider a more elaborate use case which involves copying data from a source to a Kafka topic. This would be the case with Mirror Maker, for example, or any "stream processing" use case. We want it to be the case that the process doing the population can periodically save its position in the upstream topic/database and always resume from this saved position. In the event of a crash we want the copy process to be able to resume from the last known position without producing duplicates in the destination topic. To accomplish this the copy process can save BOTH its input offset/position AND the ids we will introduce associated with its downstream topic. When it restarts after a crash it will initialize with the saved ids. This will effectively make the duplicate produce requests the same as the network error retry case described above.

...

A simple, impractical implementation for deduplication would be to have the client create a unique id for each message it sends (a UUID, say) and have the server save all such ids for all messages it retains. New messages would be checked against this database and messages that existed already would be rejected. This would satisfy at least the basic requirements, but is impractical would be hopelessly inefficient as the database would contain O(num_messages) entries. A practical implementation will have to provide a similar way of detecting duplicates but with lower space requirements .and negligible lookup performance impact.

A similar but more efficient implementation An obvious mechanism for reducing the data we need to store would be to assign each producer a unique id (PID) and keep a sequence number that increments with each message sent. This pair effectively acts as a unique id, but the broker no longer needs to store all the ids to reason about what it has received from a given producer. This leverages the in-order property of Kafka (and TCP) to ensure that the broker need only keep a single "highwater mark" sequence number for each producer , which I'll call and reject any message with a lower sequence number. Specifically if H(P) . If  is the highwatermark and if the broker receives a message with PID P and sequence number S then it will accept the message iff H(P) < S.

The next question is whether the client producer will maintain a global sequence number across all messages it sends or whether it will be per topic-partition. A global number would be simpler for the client to implement. However if the sequence number was per-partition then the broker could enforce a tighter constraint, namely that H(P) + 1 = S. This would allow us to handle the pipelined request case as if any request fails we will automatically fail all other in-flight requests which will allow us to thus retain retry the full set in order.

...

lease_pid_request => topic partition pid

Response:

lease_pid_response => error pid generation sequence_number expire_ms

These requests This request could also have a batch version for handling multiple partitions at once, but I described the single partition case for simplicity.

There are several intended uses for this API:

...

Instead this proposal assumes the cluster will have some configured period of disuse after fixed lifetime for pids from the point of issuance after which a pid is available for reuse. It would also be possible to allow clients to define custom expirations in their lease_pid request but that would require a more complex implementation as all replications would have to know about each expiration. The server will issue pids approximately in order so reuse will only actually occur after 4 billion pids have been issued.

...

Note that the expiration is only approximate as it is based on the time a server sees the first message for a partition. However it is only required that the server guarantee at least that much time, so retaining pids longer is okay. This means the followers can use arrival time (though arrival on followers will be slightly older than on the leader). In the event of a full data restore the circular buffer of pid entries will be full and all will have full expiration time restored.

Client Implications

The general deduplication will happen automatically in the producer. It should be cheap and easy enough to enable by default.

To integrate this in tools like mirror maker and samza that chain producers and consumers we will need to be able to save the PID and sequence number of a producer. We can do this by including this in the response returned by the producer.

The producer will need a config to set it's initial PID, sequence number, and generation at initialization time.

We may want to consider extending the OffsetCommit request to also store these fields.