Current state: Under Discussion
Discussion thread: here[TODO]
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Each partition in broker maintains a list of the 5 most recent appended batches as producer state for each producerId (a.k.a PID). If a producer fails to receive a successful response and retries the produce request, broker can still return the offset of the successful append from the maintained cache via `RecordMetadata`. The maintained cache per producerId per partition in broker instance consumes lots of memory which causes OOM in production and it is not necessary to always provide client the offset of already appended batch in response. Therefore, this KIP proposes to only maintain one latest appended batch write so that both memory usage for producer state and the complexity to manage it are reduced on the broker side.
Existing producer configuration max.in.flight.requests.per.connection with default value of 5 will be obsolete eventually. We will define a duplicate sequence range threshold in broker side as max.in.flight.sequence.number.per.connection. Broker will treat all sequence number smaller than (accounting overflow) "latest appended sequence" but within max.in.flight.sequence.number.per.connection as already appended and duplicate from producer request. All sequence number outside of this range will be treated as future sequence number which haven't been appended or allocated from producer request. We will define this configuration default value large enough so that producer won't be able to have more than this amount of inflight sequence numbers.
Producer needs keep a same value of max.in.flight.sequence.number.per.connection decided by broker. When producer starts up, it sends API request to broker. Broker will piggyback the value of max.in.flight.sequence.number.per.connection on API response to producer.
The broker will only maintain the latest appended batch's sequence/offset for each producerId instead of 5 recent appended batches. The broker will only accept incoming batch request in sequence order so that it can guarantee any sequence number equal or less than the latest sequence will be durable.
When the sequence number reaches Int.MaxValue, client can wraparound starting from 0 again.
On broker side,
On Producer side,
When clients receive DUPLICATE_SEQUENCE_NUMBER, it will discard it and move on as before.
When clients receive OUT_OF_ORDER_SEQUENCE, it will handle as before - retry the send request.
When producer and broker agree on the max.in.flight.sequence.number.per.connection value and producer throttles write accordingly, broker can make sure within the red range below "latest appended sequence number" are duplicate and within the green range above "latest appended sequence number" are future sequences not appended yet. We also need to make sure the red range and green range don't overlap each other, so the maximum value for max.in.flight.sequence.number.per.connection cannot exceed 2^30. On the other hand, the threshold of max.in.flight.sequence.number.per.connection should never throttle the customer workload at all in real world. Based on different factors like max payload size, max socket buffer size, compaction rate and etc. we come up with a default value of 10 million as max possible sequence number inflight that should not hit in extreme cases.
With this proposal, restriction on max.in.flight.requests.per.connection can be removed and we don't depend on number of inflight request batches to throttle write. Broker won't provide offset along with a DUPLICATE_SEQUENCE_NUMBER error response except for duplicate with latest appended batch. This feature needs document that the user won't expect the offset is always reported in `RecordMetadata` response. We will obsolete max.in.flight.requests.per.connection and introduce new configuration max.in.flight.sequence.number.per.connection in both broker & producer to control the inflight writes.
Mechanism may be needed to tell whether a broker supports the new duplicate detection logic, like bumping the Produce API version so that a client can tell whether the broker it talks to is an old version (a.k.a. limit to 5 inflight sequence number) or new version (has max.in.flight.sequence.number.per.connection to set back in producer) upon communication.
We will also deliver/migrate this feature in phases:
There is no rejected alternatives. However, we thought about bumping producer epoch to solve the sequence ID wraparound issue so that we don't need to take an arbitrary large enough threshold number like 10 million. There are couple of concerns with this approach:
Based on above concerns, we decided to start with the default wraparound solution.