...
To mitigate this, we will automatically force a Task
commit if the total uncommitted records returned by StateStore#approximateNumUncommittedEntries()
exceeds a threshold, configured by statestore.uncommitted.max.records
; or the total memory used for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes()
exceeds the threshold configured by statestore.uncommitted.max.bytes
. This will roughly bound the memory required per-Thread for buffering uncommitted records, irrespective of the commit.interval.ms
, and will effectively bound the number of records that will need to be restored in the event of a failure.
It's possible that some Topologies can generate many more new StateStore
entries than the records they process, in which case, it would be possible for such a Topology to cross the configured record/memory thresholds mid-processing, potentially causing an OOM error if these thresholds are exceeded by a lot. To mitigate this, the StreamThread
will measure the increase in records/bytes written on each iteration, and pre-emptively commit if the next iteration is likely to cross a threshold.
Note that these new methods provide default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.
...