Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP explores an alternative design that should have little/no performance impact, and can thus be enabled for all stores.

Public Interfaces

New configuration

...

NameDefaultDescription
statestore.uncommitted.max.records-1Maximum number of uncommitted state-store records to buffer per-task. If this limit is exceeded, a task commit will be requested. No limit: -1.
statestore.uncommitted.max.bytes67108864 (64 MB)

Maximum number of memory bytes to be used to buffer uncommitted state-store records per-task. If this limit is exceeded, a task commit will be requested. No limit: -1.

Note: if this is too high or unbounded, it's possible for RocksDB to trigger out-of-memory errors.

Changed

...

Interfaces

  • org.apache.kafka.streams.processor.StateStore
  • org.apache.kafka.streams.processor.StateStoreContext

...

Internally, we enable configuration of the level of isolation provided by StateStores via a context-wide IsolationLevel, which can be configured to either:

IsolationLevelDescriptionprocessing.mode
READ_UNCOMMITTEDRecords written to any transaction are visible to all other transactions immediately. This level provides no atomicity, consistency, isolation or durability guarantees.at-least-once
READ_COMMITTEDRecords written to one transaction are only visible by other transactions once they have been committed.exactly-once, exactly-once-beta, exactly-once-v2

StateStore Transactions are only committed once the Kafka transaction for their changelog entries have been committed. This ensures that, under exactly-once, the records written to disk always reflect the state of the store's changelog.

...