Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The next question is whether the client will maintain a global sequence number across all messages it sends or whether it will be per topic-partition. A global number would be simpler for the client. However if the sequence number was per-partition then the broker could enforce a tighter constraint, namely that H(P) + 1 = S. This would allow us to handle the pipelined request case as if any request fails we will automatically fail all other in-flight requests which will allow us to thus retain retry the full set in order.

Note that what is described so far handles the transitive consumer/producer case described above. The process can periodically store both it's offset in its upstream sources as well as its PID and sequence number. When it restarts it will reinitialize with the offset, PID, and sequence number. Several of its initial requests may be rejected as they have already been sent and are below the server's highwater mark. 

To complete this proposal we just need to figure out how to provide unique PIDs to producers, how to provide fault tolerance for the highwater marks, and how to provide the "fencing" described above to prevent two producers with the same PID from interfering with one another.

Implementation Details

Now I will make the proposal a bit more explicit.

We will add a new API lease_pid which will be used to allocate a unique producer id. The API will have the following format:

Request:

lease_pid_request => topic partition pid expire_ms

Response:

lease_pid_response => error pid generation sequence_number expire_ms

These requests could also have a batch version for handling multiple partitions at once.

The intended usage is as follows. The producer would issue a lease_pid request for each partition it wants to send to every time it establishes a new connection. When the producer first initializes it would send the pid=-1 which would cause the server to issue it a new pid in the response.

The generation value is used to avoid having two producers with the same PID. 

The request can also be used to check that