Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

RocksDB provides WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. The performance overhead of doing this should be negligible. The only performance consideration is that the buffer must reside completely in-memory until it is committed. To mitigate this, we will automatically force a Task commit if the total uncommitted records returned by StateStore#approximateNumUncommittedEntries()  exceeds a threshold, configured by maxstatestore.uncommitted.state.entries.per.taskmax.records; or the total memory used for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes() exceeds the threshold configured by maxstatestore.uncommitted.statemax.bytes.per.task. This will roughly bound the memory required per-Task for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failure.These limits will be checked in StreamTask#process and a premature commit will be requested via Task#requestCommit().

Note that these new methods provide default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.

...