Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Clarify optimal behaviour under ALOS

...

Public Interfaces

New configuration:

NameDefaultDescription
statestore.uncommitted.max.records-1Maximum number of uncommitted state-store records to buffer per-task. If this limit is exceeded, a task commit will be requested. No limit: -1.
statestore.uncommitted.max.bytes67108864 (64 MB)

Maximum number of memory bytes to be used to buffer uncommitted state-store records per-task. If this limit is exceeded, a task commit will be requested. No limit: -1.

Note: if this is too high or unbounded, it's possible for RocksDB to trigger out-of-memory errors.

Changed:

  • org.apache.kafka.streams.processor.StateStore

...

RocksDB provides WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. The performance overhead of doing this should be negligible. The only performance consideration is that the buffer must reside completely in-memory until it is committed.

To mitigate this, under EOS we will automatically force a Task commit if the total uncommitted records returned by StateStore#approximateNumUncommittedEntries()  exceeds a threshold, configured by statestore.uncommitted.max.records; or the total memory used for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes() exceeds the threshold configured by statestore.uncommitted.max.bytes. This will roughly bound the memory required per-Task for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failure.

When running under ALOS, we will instead automatically commit only the Task state stores, but not the entire Task itself (i.e. the consumer offsets), as an optimization to preserve the users desired commit.interval.ms.

Note that these new methods provide default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.

...