You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Introduction

Kafka provides "at least once" delivery semantics. This means that a message that is sent may delivered one or more times. What people really want is "exactly once" semantics whereby duplicate messages are not delivered.

There are two reasons duplicate message may occur:

  1. If a client attempts to send a message to the cluster and gets a network error then retrying will potentially lead to duplicates. If network error occurred before the message was delivered no duplication will occur. However if the network error occurs after the message is appended to the log but before the response can be delivered to the sender the sender is left not knowing what has happened. The only choices are to retry and risk duplication or to give up and declare the message lost.
  2. If a consumer reads a message from a topic and then crashes then when the consumer restarts or another instances takes over consumption the new consumer will start from the last known position of the original consumer.

The second case can be handled by consumers by making use of the offset Kafka provides. They can store the offset with their output and then ensure that the new consumer always picks up from the last stored offset. Or, they can use the offset as a kind of key and use it to deduplicate their output in whatever final destination system they populate.

The first case currently has no good solution, however. The client doesn't know the offset of the message so it has no unique way to identify the message and check if the send succeeded.

This proposal will introduce an optional set of ids that will provide a unique identifier for messages a producer sends to avoid duplicates.

Some Nuances

 Opt-in

Producer implementations that don't care about idempotency should not need to do anything special.

Transitivity: Consumers that also produce

Consider a more elaborate use case which involves copying data from a source to a Kafka topic. This would be the case with Mirror Maker, for example. We want it to be the case that the process doing the population can periodically save its position in the upstream topic/database and always resume from this saved position. In the event of a crash we want the copy process to be able to resume from the last known position without producing duplicates in the destination topic. To accomplish this the copy process can save BOTH its input offset/position AND the ids we will introduce associated with its downstream topic. When it restarts after a crash it will initialize with the saved ids. This will effectively make the duplicate produce requests the same as the network error retry case described above.

Fencing

Another twist to this is that it in the mirror maker or other cases where consumer failure is automatically detected it is possible to have false positives leading to a situation where at least transiently we have two consumers reading the same input and producing the same output. It is important that we handle this "split brain" problem correctly and gracefully.

Pipelining

A related need is the ability to pipeline produce requests safely in the presence of retries. When combined with retries this can lead to messages being stored out of order. If the sender sends messages M1, M2, M3 asynchronously without waiting for responses it may then receive a success for M1 and M3 but an error for M2. If it retries M2 successfully this will lead to the topic containing the messages in the order M1, M3, M2.

Fault tolerance

A common cause of errors is actual broker failure. If a broker fails with a request outstanding and unacknowledged you don't know if the newly elected master contains the message or not and will want to retry your request. Thus the idempotency mechanism needs to work even in the presence of broker failures.

Proposed Implementation

A simple, impractical implementation for deduplication would be to have the client create a unique id for each message it sends (a UUID, say) and have the server save all such ids for all messages it retains. New messages would be checked against this database and messages that existed already would be rejected. This would satisfy at least the basic requirements, but is impractical as the database would contain O(num_messages) entries. A practical implementation will have to provide a similar way of detecting duplicates but with lower space requirements.

An obvious mechanism for reducing the data we need to store would be to assign each producer a unique id (PID) and keep a sequence number that increments with each message sent. This leverages the in-order property of Kafka (and TCP) to ensure that the broker need only keep a single "highwater mark" sequence number for each producer, which I'll call H(P). If the broker receives a message with PID P and sequence number S then it will accept the message iff H(P) < S.

The next question is whether the client will maintain a global sequence number across all messages it sends or whether it will be per topic-partition. A global number would be simpler for the client. However if the sequence number was per-partition then the broker could enforce a tighter constraint, namely that H(P) + 1 = S.

 

 

 

  • No labels