Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: IQ: Add config, explain how readCommitted works

...

  • statestore.transactional.enabled (true by default) - enables transactional behavior for the state stores. When false, the state stores behave as they do now.
  • statestore.transactional.mechanism (rockdb_indexbatch by default) - specifies the means to implement transactional writes. rockdb_indexbatch is the only valid configuration value for now.

StoreQueryParameters (IQv1)

  • readCommitted (false by default) - controls whether IQv1 reads uncommitted or committed data. 

StateQueryRequest (IQv2):

  • readCommitted (false by default) - controls whether interactive queries read IQv2 reads uncommitted or committed data. 

...

If statestore.transactional.enabled is true, but the store does not support transactions, the corresponding stream task falls back to the non-transactional behavior.Interactive queries return either committed or uncommitted results, according to enable.iq.read_committed. 

A transactional state store opens the first transaction during initialization. It commits on StateStore#flush - first, the store commits the transaction, then flushes, then starts a new transaction. 

...

The if-EOS condition should only apply in all these cases if the feature flag is off or state stores are not transactional.

IQ

We introduce new readCommitted parameters for IQv1 and IQv2 that control whether queries return committed or uncommitted results if the underlying state store is transactional. If the underlying store is not transactional, then IQs return the most recently written data regardless of the same readCommitted value. 

If readCommitted is false or the store is not transactional, interactive queries work exactly the same as we do now. If readCommitted is true and the state store is transactional, then the query reads directly from the store ignoring records in RecordCache and the current uncommitted transaction.

RocksDB

When statestore.transactional.mechanism=rockdb_indexbatch, Kafka Streams will make the writes to the built-in RocksDB state store transactional by using  WriteBatchWithIndex, which is similar to WriteBatch already used segment stores, except it also allows reading uncommitted data.

...