Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Each partition in broker maintains a list of the 5 most recent appended sequence numbers and corresponding offsets batches as producer state for each producerId (a.k.a PID). If a producer fails to receive a successful response and retries the produce request, broker can still return the offset of the successful append from the maintained cache via `RecordMetadata`. The maintained cache per producerId per partition in broker instance consumes lots of memory which causes OOM in production . This KIP aims at reducing and it is not necessary to always provide client the offset of already appended batch in response. Therefore, this KIP proposes to only maintain one latest appended batch write so that both memory usage for producer state and the complexity to manage it are reduced on the broker side.

Public Interfaces

Existing producer configuration max.in.flight.requests.per.connection with default value of 5 will be obsolete eventually. We will define a duplicate sequence range threshold in broker side as max.in.flight.sequence.number.per.connection. Broker will treat all sequence number within (smaller than (accounting overflow) "latest appended sequence -  " but within max.in.flight.sequence.number.per.connection, latest appended sequence] (accounting overflow) as  as already appended or and duplicate from producer request. All sequence number outside of this range will be treated as future sequence number which haven't been appended or allocated from producer request. We will define this configuration default value large enough so that it producer won't wraparound to mix appended and be able to have more than this amount of inflight sequence numbers.

Producer need needs keep a same value of max.in.flight.sequence.number.per.connection decided by broker. When producer starts up, it sends API request to broker. Broker will piggyback the value of max.in.flight.sequence.number.per.connection on API response to producer.

...

  • Brokers will accept batch with first sequence number that is 1 one strictly higher than (accounting for overflow) last sequence number of latest appended batch.
  • Brokers will return DUPLICATE_SEQUENCE_NUMBER for any sequence that is within max.in.flight.sequence.number.per.connection range of the latest sequence number (accounting for overflow). In this case, we won't return offset of the appended batch except for duplicate batch with latest appended one.
  • Brokers will return OUT_OF_ORDER_SEQUENCE for any sequence that is outside max.in.flight.sequence.number.per.connection range of the latest sequence number.

...

  • When clients receive DUPLICATE_SEQUENCE_NUMBER, it will discard it and move on as before.

  • When clients receive OUT_OF_ORDER_SEQUENCE, it will handle as before - retry the send request.

  • Clients will throttles write when the number of inflight sequences reaches max.in.flight.sequence.number.per.connection

Image Added

When producer and broker agree on the max.in.flight.sequence.number.per.connection is an arbitrary large value to distinguish the already appended sequence range and not-yet appended/allocated sequence range without wraparound. In ideal world, it should not ever throttle the producer either value and producer throttles write accordingly, broker can make sure within the red range below "latest appended sequence number" are duplicate and within the green range above "latest appended sequence number" are future sequences not appended yet. We also need to make sure the red range and green range don't overlap each other, so the maximum value for max.in.flight.sequence.number.per.connection cannot exceed 2^30. On the other hand, the threshold of max.in.flight.sequence.number.per.connection should never throttle the customer workload at all in real world. Based on different factors like max payload size, max socket buffer size, compaction rate and etc. we come up with a default value of 10 million as max inflight possible sequence number possible inflight that should not hit throttling. And given the total 2 billion sequence number, it won't wraparound as well. See below figure for sequence number allocation and range distribution:in extreme cases.

Compatibility, Deprecation, and Migration Plan

...

  1. Deliver only broker side change: replace 5 appended batches cache with only one latest appended batch cache. Define Introduce broker side configuration max.in.flight.sequence.number.per.connection with default value of 10 million as duplicate check threshold. No producer side changes at all: max.in.flight.requests.per.connection as default 5 is still enforced. This makes broker in a safer place that producer won't generate more than max.in.flight.sequence.number.per.connection  sequence numbers due to number of inflight batches limitation.
  2. Deliver producer side changes:  introduce producer side configuration max.in.flight.sequence.number.per.connection and bump producer API version for passing configuration value from broker to producer. Enforce producer workload throttling based on max.in.flight.sequence.number.per.connection. Obsolete producer side configuration max.in.flight.requests.per.connection.

Rejected Alternatives

There is no rejected alternatives. However, we thought about bumping producer epoch to solve the sequence ID wraparound issue so that we don't need to take an arbitrary large enough threshold number like 10 million. There are couple of concerns with this approach:

...