You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Introduction

Kafka provides "at least once" delivery semantics. This means that a message that is sent may delivered one or more times. What people really want is "exactly once" semantics whereby duplicate messages are not delivered.

There are two reasons duplicate message may occur:

  1. If a client attempts to send a message to the cluster and gets a network error then retrying will potentially lead to duplicates. If network error occurred before the message was delivered no duplication will occur. However if the network error occurs after the message is appended to the log but before the response can be delivered to the sender the sender is left not knowing what has happened. The only choices are to retry and risk duplication or to give up and declare the message lost.
  2. If a consumer reads a message from a topic and then crashes then when the consumer restarts or another instances takes over consumption the new consumer will start from the last known position of the original consumer.

The second case can be handled by consumers by making use of the offset Kafka provides. They can store the offset with their output and then ensure that the new consumer always picks up from the last stored offset. Or, they can use the offset as a kind of key and use it to deduplicate their output in whatever final destination system they populate.

The first case currently has no good solution, however. The client doesn't know the offset of the message so it has no unique way to identify the message and check if the send succeeded.

This proposal will introduce an optional set of ids that will provide a unique identifier for messages a producer sends to avoid duplicates.

Some Nuances

Opt-in

Producer implementations that don't care about idempotency should not need to do anything special.

Transitivity: Consumers that also produce

Consider a more elaborate use case which involves copying data from a source to a Kafka topic. This would be the case with Mirror Maker, for example. We want it to be the case that the process doing the population can periodically save its position in the upstream topic/database and always resume from this saved position. In the event of a crash we want the copy process to be able to resume from the last known position without producing duplicates in the destination topic. To accomplish this the copy process can save BOTH its input offset/position AND the ids we will introduce associated with its downstream topic. When it restarts after a crash it will initialize with the saved ids. This will effectively make the duplicate produce requests the same as the network error retry case described above.

Fencing

Another twist to this is that it in the mirror maker or other cases where consumer failure is automatically detected it is possible to have false positives leading to a situation where at least transiently we have two consumers reading the same input and producing the same output. It is important that we handle this "split brain" problem correctly and gracefully.

Pipelining

A related need is the ability to pipeline produce requests safely in the presence of retries. When combined with retries this can lead to messages being stored out of order. If the sender sends messages M1, M2, M3 asynchronously without waiting for responses it may then receive a success for M1 and M3 but an error for M2. If it retries M2 successfully this will lead to the topic containing the messages in the order M1, M3, M2.

Fault tolerance

A common cause of errors is actual broker failure. If a broker fails with a request outstanding and unacknowledged you don't know if the newly elected master contains the message or not and will want to retry your request. Thus the idempotency mechanism needs to work even in the presence of broker failures.

Proposed Implementation

A simple, impractical implementation for deduplication would be to have the client create a unique id for each message it sends (a UUID, say) and have the server save all such ids for all messages it retains. New messages would be checked against this database and messages that existed already would be rejected. This would satisfy at least the basic requirements, but is impractical as the database would contain O(num_messages) entries. A practical implementation will have to provide a similar way of detecting duplicates but with lower space requirements.

An obvious mechanism for reducing the data we need to store would be to assign each producer a unique id (PID) and keep a sequence number that increments with each message sent. This leverages the in-order property of Kafka (and TCP) to ensure that the broker need only keep a single "highwater mark" sequence number for each producer, which I'll call H(P). If the broker receives a message with PID P and sequence number S then it will accept the message iff H(P) < S.

The next question is whether the client will maintain a global sequence number across all messages it sends or whether it will be per topic-partition. A global number would be simpler for the client. However if the sequence number was per-partition then the broker could enforce a tighter constraint, namely that H(P) + 1 = S. This would allow us to handle the pipelined request case as if any request fails we will automatically fail all other in-flight requests which will allow us to thus retain retry the full set in order.

Note that what is described so far handles the transitive consumer/producer case described above. The process can periodically store both it's offset in its upstream sources as well as its PID and sequence number. When it restarts it will reinitialize with the offset, PID, and sequence number. Several of its initial requests may be rejected as they have already been sent and are below the server's highwater mark. 

To complete this proposal we just need to figure out how to provide unique PIDs to producers, how to provide fault tolerance for the highwater marks, and how to provide the "fencing" described above to prevent two producers with the same PID from interfering with one another.

Implementation Details

Now I will make the proposal a bit more explicit.

We will add a new API lease_pid which will be used to allocate a unique producer id. The API will have the following format:

Request:

lease_pid_request => topic partition pid expire_ms

Response:

lease_pid_response => error pid generation sequence_number expire_ms

These requests could also have a batch version for handling multiple partitions at once.

The intended usage is as follows. The producer would issue a lease_pid request for each partition it wants to send messages to. When the producer first initializes it would send the pid=-1 which would cause the server to issue it a new pid in the response.

The generation value is used to avoid having two producers with the same PID. 

The request can also be used to check that  

 

  • No labels