Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add paragraph on pre-emptively committing based on expected memory usage.

...

To mitigate this, we will automatically force a Task commit if the total uncommitted records returned by StateStore#approximateNumUncommittedEntries()  exceeds a threshold, configured by statestore.uncommitted.max.records; or the total memory used for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes() exceeds the threshold configured by statestore.uncommitted.max.bytes. This will roughly bound the memory required per-Thread for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failure.

It's possible that some Topologies can generate many more new StateStore entries than the records they process, in which case, it would be possible for such a Topology to cross the configured record/memory thresholds mid-processing, potentially causing an OOM error if these thresholds are exceeded by a lot. To mitigate this, the StreamThread will measure the increase in records/bytes written on each iteration, and pre-emptively commit if the next iteration is likely to cross a threshold.

Note that these new methods provide default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.

...