Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • org.apache.kafka.streams.processor.StateStore
  • org.apache.kafka.streams.state.StoreSupplier

  • org.apache.kafka.streams.state.Stores

  • org.apache.kafka.streams.kstream.Materialized

  • org.apache.kafka.streams.kstream.StreamJoined

  • org.apache.kafka.streams.StoreQueryParameters
  • org.apache.kafka.streams.query.StateQueryRequest

Proposed Changes

Background

...

To sum up, StateStore#recover transitions the state store to the consistent, committed state and returns the changelog corresponding to that state. Given the Streams workloads properties described above and the AK transaction commit order, the returned offset is always either the offset in the checkpoint file or the offset committed to the changelog.

Transactions via Secondary State Store for Uncommitted Changes

New Stores# factory methods described above provide an option to create transactional state stores. Transactionality is guaranteed by batching uncommitted (dirty) writes in a temporary RocksDB instance. Once the task flushes, the temporary store creates a commit file with a changelog offset, indicating that the transaction is ready to commit, and writes dirty records into the regular store. It truncates the temporary store and deletes the commit file once it is finished. 

On crash failure, ProcessorStateManager calls StateStore#recover(offset). The state store checks if the temporary store and the commit file exist. If they do, the task repeats the commit process described above (i.e., rolls forward) and returns the new committed offset. Otherwise, it truncates the temporary store (rolls uncommitted changes back) and returns the previously committed offset.

The major advantage of this approach is that the temporary state store can optionally use the available disk space if the writes do not fit into the in-memory buffer. 

The disadvantages are:

  • It doubles the number of open state stores 
  • It potentially has higher write and read amplification due to uncontrolled flushes of the temporary state store.

Behavior changes

Behavior changes

If StateStore#transactional() returns true, then the store performs writes via the implementation-specific transactional mechanism. Reads via ReadOnlyKeyValueStore methods return uncommitted data from the ongoing transaction.

A transactional state store opens the first transaction during initialization. It commits on StateStore#commit - first, the store commits the transaction, then flushes, then starts a new transaction. 

There are several places where StreamTask, ProcessorStateManager, and TaskManager check if EOS is enabled, and then it deletes the checkpoint file on crash failure, specifically, when:

  • StreamTask resumes processing (link)
  • ProcessorStateManager initializes state stores from offsets from checkpoint (link1, link2)
  • StreamTask writes offsets to the checkpoint file on after committing (link)
  • TaskManager handles revocation (link)

If EOS is enabled, we will remove offset information for non-transactional state stores from the checkpoint file instead of just deleting the file.

Transactions via Secondary State Store for Uncommitted Changes

New Stores# factory methods described above provide an option to create transactional state stores. Transactionality is guaranteed by batching uncommitted (dirty) writes in a temporary RocksDB instance. Once the task flushes, the temporary store creates a commit file with a changelog offset, indicating that the transaction is ready to commit, and writes dirty records into the regular store. It truncates the temporary store and deletes the commit file once it is finished. 

All writes and deletes go to the temporary store. Reads query the temporary store; if the data is missing, query the regular store. Range reads query All writes and deletes go to the temporary store. Reads query the temporary store; if the data is missing, query the regular store. Range reads query both stores and return a KeyValueIterator that merges the results.

Behavior changes

If StateStore#transactional() returns true, then the store performs writes via the implementation-specific transactional mechanism. Reads via ReadOnlyKeyValueStore methods return uncommitted data from the ongoing transaction.

A transactional state store opens the first transaction during initialization. It commits on StateStore#flush - first, the store commits the transaction, then flushes, then starts a new transaction. 

There are several places where StreamTask, ProcessorStateManager, and TaskManager check if EOS is enabled, and then it deletes the checkpoint file on crash failure, specifically, when:

  • StreamTask resumes processing (link)
  • ProcessorStateManager initializes state stores from offsets from checkpoint (link1, link2)
  • StreamTask writes offsets to the checkpoint file on after committing (link)
  • TaskManager handles revocation (link)

If EOS is enabled, we will remove offset information for non-transactional state stores from the checkpoint file instead of just deleting the file.

Interactive Queries (IQ)

We introduce new readCommitted parameters for IQv1 and IQv2 that control whether queries return committed or uncommitted results if the underlying state store is transactional. If the underlying store is not transactional, then IQs return the most recently written data regardless of the same readCommitted value. 

If readCommitted is false or the store is not transactional, interactive queries work exactly the same as we do now. If readCommitted is true and the state store is transactional, the query reads directly from the store ignoring records in RecordCache and the current uncommitted transaction.

Interface Changes

StateStore

On crash failure, ProcessorStateManager calls StateStore#recover(offset). The state store checks if the temporary store and the commit file exist. If they do, the task repeats the commit process described above (i.e., rolls forward) and returns the new committed offset. Otherwise, it truncates the temporary store (rolls uncommitted changes back) and returns the previously committed offset.

The major advantage of this approach is that the temporary state store can optionally use the available disk space if the writes do not fit into the in-memory buffer. 

The disadvantages are:

  • It doubles the number of open state stores 
  • It potentially has higher write and read amplification due to uncontrolled flushes of the temporary state store.

Interface Changes

StateStore

Code Block
language
Code Block
languagejava
titleStateStore.java
/**
* Return true the storage supports transactions.
*
* @return {@code true} if the storage supports transactions, {@code false} otherwise
*/
default boolean transactional() {
   return false;
}       

/**
  * Flush and commit any cached data
  * <p>
  * For transactional state store commit applies all changes atomically. In other words, either the
  * entire commit will be successful or none of the changes will be applied.
  * <p>
  * For non-transactional state store this method flushes cached data.
  *
  * @param changelogOffset the offset of the changelog topic this commit corresponds to. The
  *                        offset can be null if the state store does not have a changelog
  *                        (e.g. a global store).
  * @code null}
  */
void commit(final Long changelogOffset);

/**
  * Recover a transactional state store
  * <p>
  * If a transactional state store shut down with a crash failure, this method can either
  * roll back or forward uncommitted changes. In any case, this method returns the changelog
  * offset it rolls to.
  *
  * @param changelogOffset the checkpointed changelog offset.
  * @return the changelog offset after recovery.
  */
Long recover(final Long changelogOffset) 

...

Code Block
public Materialized<K, V, S> withTransactionalityEnabled();

public Materialized<K, V, S> withTransactionalityDisabled();

public StoreSupplier<S> storeSupplier();

public boolean transactional();

StoreQueryParameters

Code Block
languagejava
/**
 * Enable reading only committed data
 * @return StoreQueryParameters a new {@code StoreQueryParameters} instance configured with reading only committed data
 */
public StoreQueryParameters<T> enableReadCommitted() 

StateQueryRequest

Code Block
/**
 * Specifies that this query should only read committed data
 */
public StateQueryRequest<R> enableReadCommitted() 

Compatibility, Deprecation, and Migration Plan

Transactional state stores will be disabled by default. Both Streams DSL and Processor API users can enable transactional writes in the built-in RocksDB state store by passing a new boolean flag transactional=true to Materialized constructor and Stores factory methods. Custom state stores will have an option to enable transactionality by adjusting their implementation according to the contract StateStore#transactional() contract.

Proposed changes are source compatible and binary incompatible with previous releases.

Test Plan

  1. Changes not committed to the changelog topic are discarded on crash failure.
  2. Changes committed to the changelog topic, but not committed to the state store are rolled forward.

Rejected Alternatives

RocksDB in-memory Indexed Batches

A considered alternative is to make built-in RocksDB state store transactional by using  WriteBatchWithIndex, which is similar to WriteBatch already used segment stores, except it also allows reading uncommitted data.

The advantage of this approach is that it uses the RocksDB built-in mechanism to ensure transactionality and offers the smallest possible write amplification overhead.

The disadvantage of this approach is that all uncommitted writes must fit into memory. In practice, RocksDB developers recommend the batches to be no larger than 3-4 megabytes (link) which might be an issue 

RocksDB Optimistic Transactions

Another considered alternative is OptimisticTransactionDB. This alternative suffers from the same issues as in-memory indexed batches, but also has greater overhead. It offers more guarantees than Kafka Streams needs, specifically - ensures that there were no write conflicts between concurrent transactions before committing. There are no concurrent transactions in Kafka Streams, so there is no reason to pay for the associated overhead. 

Transactions via Secondary State Store for Uncommitted Changes

In this alternative, Kafka Streams opens two stores instead of one - a temporary store and a regular store. All uncommitted writes go to the temporary store. Once the task flushes, the temporary store creates a commit file, an empty file that indicates that the corresponding file is ready to commit, and writes stored records to the regular store. It truncates the store and deletes the commit file once it is finished. 

On crash failure, the task checks if the temporary store and the commit file exist. If they do, the task repeats the commit process described above. Otherwise, it truncates the temporary store.

The major advantage of this approach is that the temporary state store can optionally use the available disk space if the writes do not fit into the in-memory buffer. 

The disadvantages are:

...

Compatibility, Deprecation, and Migration Plan

Transactional state stores will be disabled by default. Both Streams DSL and Processor API users can enable transactional writes in the built-in RocksDB state store by passing a new boolean flag transactional=true to Materialized constructor and Stores factory methods. Custom state stores will have an option to enable transactionality by adjusting their implementation according to the contract StateStore#transactional() contract.

Proposed changes are source compatible and binary incompatible with previous releases.

Test Plan

  1. Changes not committed to the changelog topic are discarded on crash failure.
  2. Changes committed to the changelog topic, but not committed to the state store are rolled forward.

Rejected Alternatives

RocksDB in-memory Indexed Batches

A considered alternative is to make built-in RocksDB state store transactional by using  WriteBatchWithIndex, which is similar to WriteBatch already used segment stores, except it also allows reading uncommitted data.

The advantage of this approach is that it uses the RocksDB built-in mechanism to ensure transactionality and offers the smallest possible write amplification overhead.

The disadvantage of this approach is that all uncommitted writes must fit into memory. In practice, RocksDB developers recommend the batches to be no larger than 3-4 megabytes (link) which might be an issue 

RocksDB Optimistic Transactions

Another considered alternative is OptimisticTransactionDB. This alternative suffers from the same issues as in-memory indexed batches, but also has greater overhead. It offers more guarantees than Kafka Streams needs, specifically - ensures that there were no write conflicts between concurrent transactions before committing. There are no concurrent transactions in Kafka Streams, so there is no reason to pay for the associated overhead. 

...

Method to control transaction lifecycle in StateStore

...