Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

To complete this proposal we just need to figure out how to provide unique PIDs to producers, how to provide fault tolerance for the highwater marks, and how to provide the "fencing" described above to prevent two producers with the same PID from interfering with one another.

Implementation Details

Now I will make the proposal a bit more explicit.

The first thing to realize is that we must ensure that our deduplication works after a server failure, which means that whichever server takes over as leader for the partition must have all the same producer id information as the former leader. The easiest way to accomplish this is to add the pid fields to the messages themselves so that they are replicated in the log to the followers.

Each message will have three new integer fields: pid, sequence_number, and generation. If PID is set to 0 the server will ignore the sequence_number and generation for compatibility with clients that do not implement idempotency. The server will maintain a mapping of (pid, topic, partition) => (generation, sequence_number_highwater). The server will inspect these fields on each new incoming message and will only append messages to the log if their sequence number is exactly one greater than its highwater mark. In addition the generation must equal the generation stored by the server or be one greater. Incrementing the generation will fence off any messages from "zombie" producers as described above.

Next we need to describe how a producer discovers its PID, sequence_number and generation. To enable this we We will add a new API lease_pid which will be used to allocate a unique producer id. The API will have the following format:

...

lease_pid_request => topic partition pid expire_ms

Response:

lease_pid_response => error pid generation sequence_number expire_ms

These requests could also have a batch version for handling multiple partitions at once.The intended usage is as follows. The producer would

There are several intended uses for this API:

  • When the client first starts and has no PID it will issue a lease_pid_request with the pid field set to -1 for each partition it

...

  • wishes to produce to. The server will respond with a unique pid a random starting generation and sequence number set to 0.
  • The client will be responsible for leasing a new pid before the expire time is reached

Server Implementation

One detail that must be carefully thought through is the expiration of pids. The simplest solution that one might think of is tieing pids to connections so we could automatically deallocate them when the connection is broken. This doesn't really work, though as the pids must survive disconnection (indeed that is their primary point).

Instead this proposal assumes the cluster will have some configured period of disuse after which a pid is available for reuse. It would also be possible to allow clients to define custom expirations in their lease_pid request but that would require a more complex implementation. The server will issue pids approximately in order so reuse will only actually occur after 4 billion pids have been issued.

Each server will allocate pids monotonically so if PID N is expired then so is PID M for M > N. This means we can just keep a simple array/list of pid-entries, new entries are added to one end and expired from the other and lookup is just based on binary search.

Both leader and followers will maintain this structure. They will periodically snapshot it to disk along with the current offset vector for all partitions they maintain. In the event of a crash they will use this snapshot and offsets to restore from the logs.

Note that the map is updated only by produce requests, the lease_pid request does not change it! The reason for this is to ensure that all data is in the replicated logs and we don't need a second replicated structure. Nonetheless we need to ensure that if a server issues a pid and then fails before any message is produced that pid can't be issued again for that topic/partition by whichever follower takes over even though those followers won't have an entry for it in their maps. To ensure this we will use a global zookeeper sequence to issue pids. For efficiency servers can increment by 100 pids at a time and then not allocate again until they have used these up (this may waste some pids when servers crash, but that is fine).

Note that the expiration is only approximate as it is based on the time a server sees the first message for a partition. However it is only required that the server guarantee at least that much time, so retaining pids longer is okay.

The generation value is used to avoid having two producers with the same PID. 

The request can also be used to check that