Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Right now, a stream processor with EOS has to delete all data from the local state stores on after crash failure because the state stores might be in a partially updated state after crash failure. The partial update of a state store can happen during a crash failure because the changes to the local state are not atomic with respect to Kafka Streams commit. If an application with EOS crashes between commits, it cannot reset the state to the previously committed, so it wipes the state stores and replays the changelog from scratch.

...

  • org.apache.kafka.streams.processor.StateStore
  • org.apache.kafka.streams.state.StoreSupplier

  • org.apache.kafka.streams.state.Stores

  • org.apache.kafka.streams.kstream.Materialized

  • org.apache.kafka.streams.kstream.StreamJoined

  • org.apache.kafka.streams.StoreQueryParameters
  • org.apache.kafka.streams.query.StateQueryRequest

Proposed Changes

...

Background

This section introduces an overview of the proposed changes. The following sections will cover the changes in behavior, configuration, and interfaces in detail.

This KIP proposes introducing a new method, boolean transactional(), to StateStore and StoreSupplier interfaces. If StateStore#transactional() returns true, then writes to that state store are transactional, meaning they apply atomically after the applications commits to the changelog topic. Users can create persistent transactional state stores via the Stores factory.

Transactional state stores do not delete the checkpoint file and the underlying data in the case of EOS. Instead, they roll forward from the changelog topic on recovery. This feature presents a trade-off between increased read, write, and memory amplifications and time to recover from crash failure in the case of EOS.

This KIP does not introduce separate methods to begin and commit transactions. Instead, a transactional state store commits a currently open transaction on StateStore#flush if the feature flag is true. The reasons for that behavior are the following:

  1. There is always a single writer in Kafka Streams workloads, and all writes must go to a single currently open transaction. 
  2. There is always a single reader that queries dirty state from a single open transaction.
  3. The state stores already explicitly call flush before writing to the checkpoint file to make uncommitted changes durable. Adding separate methods will create room for error, such as a missing call to beginTxn or commitTxn, or flushing before committing.

There are multiple ways to implement state store transactions that present different trade-offs. This KIP includes a single implementation via built-in RocksDB indexed batches and adds a configuration enum RocksDBTransactionalMechanism to add other implementations in the future in a backward-compatible way.

Interface Changes

StateStore

Code Block
languagejava
titleStateStore.java
/**
* Return true the storage supports transactions.
*
* @return {@code true} if the storage supports transactions, {@code false} otherwise
*/
default boolean transactional() {
   return false;
}


/**
* Flush any cached data. When {@link #transactional()} returns true, flush commits dirty writes.
*/
void flush();

The flush signature did not change, but the way it is expected to work when transactional() is true does, so I am including it here as well:

StoreSupplier

Code Block
languagejava
/**
 * Return true if a call to {@link StoreSupplier#get} returns a transactional state
 * store.
 *
 * @return {@code true} if a call to {@link StoreSupplier#get} returns a transactional state
 * store, {@code false} otherwise.
*/
boolean transactional();

Stores

briefly describes relevant parts of a stateful task's write lifecycle with EOS.

  1. The task (StreamTask, StandbyTask) registers its state stores. State stores load offset metadata from the checkpoint file (link). That step aims to establish a mapping between data in the state store and the offset of the changelog topic.
    1. In case of crash failure, the state store has data, but the checkpoint file does not exist. ProcessorStateManager throws an exception in that case for EOS tasks. This is an indicator to throw away local data and replay the changelog topic (link).
  2. The task processes data and writes its state locally.
  3. The task commits EOS transaction. TaskExecutor#commitOffsetsOrTransaction calls StreamsProducer#commitTransaction that sends new offsets and commits the transaction.
  4. The task runs a postCommit method (StreamTask, StandbyTask) that flushes the state stores and updates the checkpoint file (link) for non-EOS tasks (link).
  5. The task shuts down. It stops processing data, then writes its current offset to the checkpoint file and halts.

In step 4, the task does not flush and checkpoint in the case of EOS to prevent partial update on the crash failure. Without the risk of partial update, EOS could follow the same process as other processing semantics and avoid replaying the changelog.

Overview

This section introduces an overview of the proposed changes. The following sections will cover the changes in behavior, configuration, and interfaces in detail.

This KIP introduces persistent transactional state stores that guarantee atomic commit, meaning that either all uncommitted (dirty) writes will be applied together or none will. The proposal changes public APIs -  StateStore and StoreSupplier interfaces and Stores factory. StateStore and StoreSupplier have a new transactional() method that returns true if the state store is transactional. In addition, StateStore#flush method is replaced by StateStore#commit(Long) paired with StateStore#recover(Long), allowing to commit the current state at a specified offset and recovering from the crash failure to a previously committed offset. Users can create persistent transactional state stores via the Stores factory.

Transactional state stores do not delete the checkpoint file and the underlying data in the case of EOS. Instead, they roll back or forward from the changelog topic on recovery. 

There are multiple ways to implement state store transactions that present different trade-offs. This proposal includes a single implementation via a secondary RocksDB for uncommitted writes . It also adds a configuration enum RocksDBTransactionalMechanism to allow other implementations in the future in a backward-compatible way.

StateStore changes

This section covers multiple changes to the state store interfaces.  This proposal replaces StateStore#flush with 2 new methods - StateStore#commit(Long) and StateStore#recover(Long) and adds a boolean transactional() method to determine if a state store is transactional.

The reasoning behind replacing flush with commit/recover is two-fold. First, let's talk about why we don't need both flush and commit:

  1. There is always a single writer in Kafka Streams workloads, and all writes must go to a single currently open transaction. 
  2. There is always a single reader that queries dirty state from a single open transaction.
  3. The state stores already explicitly call flush after AK transaction commits before writing to the checkpoint file to make uncommitted changes durable. Adding a separate method will create room for error, such as a missing commit call or executing both commands in the wrong order.

In this sense, flush and commit are semantically the same thing. The separation of concerns between these 2 methods will be ambiguous and it is unclear what is the correct call order.

Now, why do we need an explicit recover method? Both StateStore#commit and StateStore#recover have a Long changelogOffset parameter. This parameter aims to allow state stores to identify commits by a commit token. A commit token allows the state store to distinguish between committed state versions, allowing different possible implementations for the transactional commit. StateStore#recover method accepts the checkpointed offset and can do 3 possible things:

  1. Do nothing and return the checkpointed offset if the StateStore is in a committed state.
  2. Rollback current dirty changes if, e.g., the state store crashed before committing and return the checkpointed offset.
  3. Roll forward and return the new committed offset if the state store crashed after the commit has started, but before it finished.

To sum up, StateStore#recover transitions the state store to the consistent, committed state and returns the changelog corresponding to that state. Given the Streams workloads properties described above and the AK transaction commit order, the returned offset is always either the offset in the checkpoint file or the offset committed to the changelog.

Transactions via Secondary State Store for Uncommitted Changes

New Stores# factory methods described above provide an option to create transactional state stores. Transactionality is guaranteed by batching uncommitted (dirty) writes in a temporary RocksDB instance. Once the task flushes, the temporary store creates a commit file with a changelog offset, indicating that the transaction is ready to commit, and writes dirty records into the regular store. It truncates the temporary store and deletes the commit file once it is finished. 

On crash failure, ProcessorStateManager calls StateStore#recover(offset). The state store checks if the temporary store and the commit file exist. If they do, the task repeats the commit process described above (i.e., rolls forward) and returns the new committed offset. Otherwise, it truncates the temporary store (rolls uncommitted changes back) and returns the previously committed offset.

The major advantage of this approach is that the temporary state store can optionally use the available disk space if the writes do not fit into the in-memory buffer. 

The disadvantages are:

  • It doubles the number of open state stores 
  • It potentially has higher write and read amplification due to uncontrolled flushes of the temporary state store.

Behavior changes

All writes and deletes go to the temporary store. Reads query the temporary store; if the data is missing, query the regular store. Range reads query both stores and return a KeyValueIterator that merges the results.

Behavior changes

If StateStore#transactional() returns true, then the store performs writes via the implementation-specific transactional mechanism. Reads via ReadOnlyKeyValueStore methods return uncommitted data from the ongoing transaction.

A transactional state store opens the first transaction during initialization. It commits on StateStore#flush - first, the store commits the transaction, then flushes, then starts a new transaction. 

There are several places where StreamTask, ProcessorStateManager, and TaskManager check if EOS is enabled, and then it deletes the checkpoint file on crash failure, specifically, when:

  • StreamTask resumes processing (link)
  • ProcessorStateManager initializes state stores from offsets from checkpoint (link1, link2)
  • StreamTask writes offsets to the checkpoint file on after committing (link)
  • TaskManager handles revocation (link)

If EOS is enabled, we will remove offset information for non-transactional state stores from the checkpoint file instead of just deleting the file.

Interactive Queries (IQ)

We introduce new readCommitted parameters for IQv1 and IQv2 that control whether queries return committed or uncommitted results if the underlying state store is transactional. If the underlying store is not transactional, then IQs return the most recently written data regardless of the same readCommitted value. 

If readCommitted is false or the store is not transactional, interactive queries work exactly the same as we do now. If readCommitted is true and the state store is transactional, the query reads directly from the store ignoring records in RecordCache and the current uncommitted transaction.

Interface Changes

StateStore

Code Block
languagejava
titleStateStore.java
/**
* Return true the storage supports transactions.
*
* @return {@code true} if the storage supports transactions, {@code false} otherwise
*/
default boolean transactional() {
   return false;
}       

/**
  * Flush and commit any cached data
  * <p>
  * For transactional state store commit applies all changes atomically. In other words, either the
  * entire commit will be successful or none of the changes will be applied.
  * <p>
  * For non-transactional state store this method flushes cached data.
  *
  * @param changelogOffset the offset of the changelog topic this commit corresponds to. The
  *
Code Block
languagejava
/**
 * Create a persistent {@link KeyValueBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link TimestampedKeyValueStore} you should use
 * {@link #persistentTimestampedKeyValueStore(String, boolean)} to create a store supplier instead.
 *
 * @param name  name of the store (cannot be {@code null})
 * @param whether or not the state store returned by the store supplier should be transactional
 * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
 * to build a persistent key-value store
 */
public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name, boolean transactional);

/**
 * Create a persistent {@link KeyValueBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a
 * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link KeyValueStore} you should use
 * {@link #persistentKeyValueStore(String, boolean)} to create a store supplier instead.
 *
 * @param name  name of the store (cannot be {@code null})
 * @param transactional whether or not the state store returned by the store supplier should be transactional
 * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
 * to build a persistent key-(timestamp/value) store
 */
public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name, boolean transactional);

/**
 * Create a persistent {@link WindowBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link TimestampedWindowStore} you should use
 * {@link #persistentTimestampedWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead.
 *
 * @param name                  name of the store (cannot be {@code null})
 * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
 *                              (note that the retention period must be at least long enough to contain the
 *                              windowed data's entire life cycle, from window-start through window-end,
 *                              and for the entire grace period)
 * @param windowSize            size of the windows (cannot be negative)
 * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
 *                              caching and means that null values will be ignored.
 * @param transactional         whether or not the state store returned by the store supplier should be transactional
 * @return an instance of {@link WindowBytesStoreSupplier}
 * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
 * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
 */
 public static WindowBytesStoreSupplier persistentWindowStore(final String name,
                                                              final Duration retentionPeriod,
                                                              final Duration windowSize,
                          offset can be null if the state store does not have a changelog
  *                      final boolean retainDuplicates,
                                                              final boolean transactional) throws IllegalArgumentException

/**
 * Create a persistent {@link WindowBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a
 * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link WindowStore} you should use
 * {@link #persistentWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead (e.g. a global store).
  * @code null}
  */
void commit(final Long changelogOffset);

/**
  * Recover a transactional state store
  * <p>
  * If a transactional state store shut down with a crash failure, this method can either
  * roll back or forward uncommitted changes. In any case, this method returns the changelog
  * offset it rolls to.
  *
  * @param changelogOffset the checkpointed changelog offset.
  * @return the changelog offset after recovery.
  */
Long recover(final Long changelogOffset) 

The flush signature did not change, but the way it is expected to work when transactional() is true does, so I am including it here as well:

StoreSupplier

Code Block
languagejava
/**
 * Return true if a call to {@link StoreSupplier#get} returns a transactional state
 * store.
 *
 * @param@return name{@code true} if a call to {@link StoreSupplier#get} returns a transactional state
 * store, {@code false} otherwise.
*/
boolean transactional();

Stores

Code Block
languagejava
/**
 * Create a persistent transactional {@link KeyValueBytesStoreSupplier}.  name of the store (cannot be {@code null})
 * @param<p>
 retentionPeriod* This store supplier can be passed lengthinto ofa time to retain data in the store (cannot be negative)
 *                              (note that the retention period must be at least long enough to contain the
 *                              windowed data's entire life cycle, from window-start through window-end,
 *                              and for the entire grace period)
 * @param windowSize            size of the windows (cannot be negative)
 * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
 *                      {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link TimestampedKeyValueStore} you should use
 * {@link #persistentTransactionalTimestampedKeyValueStore(String)} to create a store supplier instead.
 *
 * @param name  name of the store (cannot be {@code null})
 * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
 * to build a persistent key-value store
 */
public static KeyValueBytesStoreSupplier persistentTransactionalKeyValueStore(final String name) 

/**
 * Create a persistent transactional {@link KeyValueBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a
 * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link KeyValueStore} you should use
 * {@link #persistentKeyValueStore(String)} to create a store supplier instead.
 *
 * @param name  name of the store (cannot be {@code null})
 * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
 * to build a persistent key-(timestamp/value) store
 */
public static KeyValueBytesStoreSupplier persistentTransactionalTimestampedKeyValueStore(final String name)

  /**
 * Create a persistent transactional {@link WindowBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link TimestampedWindowStore} you should use
 * {@link #persistentTimestampedWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead.
 *
 * @param name        caching and means that null values will be ignored.          name of the store (cannot be {@code null})
 * @param transactionalretentionPeriod       length of whethertime orto notretain thedata state store returned by in the store supplier should(cannot be transactional negative)
 * @return an instance of {@link WindowBytesStoreSupplier}
 * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
 * @throws IllegalArgumentException if {@code retentionPeriod} is smaller(note thanthat {@code windowSize}
 */
public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name,
     the retention period must be at least long enough to contain the
 *                              windowed data's entire life cycle, from window-start through window-end,
 *                            final Duration retentionPeriod,
and for the entire grace period)
 * @param windowSize            size of the windows (cannot be negative)
 * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
 *                         final Duration windowSize,
   caching and means that null values will be ignored.
 * @return an instance of {@link WindowBytesStoreSupplier}
 * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
 * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
 */ 
 public static WindowBytesStoreSupplier persistentTransactionalWindowStore(final String name,
                   final boolean retainDuplicates,
                                                      final Duration retentionPeriod,
                final boolean transactional) throws IllegalArgumentException

/**
 * Create a persistent {@link SessionBytesStoreSupplier}.
 *
 * @param name              name of the store (cannot be {@code null})
 * @param retentionPeriod   length of time to retain data in the store (cannot be negative)
 *       final Duration windowSize,
                 (note that the retention period must be at least as long enough to
 *                          contain the inactivity gap of the session and the entire grace period.)
 * @param transactional     whetherfinal orboolean notretainDuplicates) thethrows state store returned by the store supplier should be transactional  IllegalArgumentException  

/**
 * Create a persistent transactional {@link WindowBytesStoreSupplier}.
 * @return an instance of a {@link  SessionBytesStoreSupplier}
 */
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
                             <p>
 * This store supplier can be passed into a
 * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link WindowStore} you should use
 * {@link #persistentWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead.
 *
 * @param name                  name of the store (cannot be {@code null})
 * @param retentionPeriod       finallength Durationof retentionPeriod,
time to retain data in the store (cannot be negative)
 *                              (note that the retention period must be at least long enough to contain the
 *         final boolean transactional)

Materialized

Code Block
public Materialized<K, V, S> withTransactionalityEnabled();

public Materialized<K, V, S> withTransactionalityDisabled();

public StoreSupplier<S> storeSupplier();

public boolean transactional();

StoreQueryParameters

Code Block
languagejava
    /**
   windowed data's *entire Enablelife readingcycle, onlyfrom committed datawindow-start through window-end,
 *    *      @return StoreQueryParameters a new {@code StoreQueryParameters} instance configured with reading only committed data
     */
   and public StoreQueryParameters<T> enableReadCommitted() 

StateQueryRequest

Code Block
/**for the entire grace period)
 * Specifies@param thatwindowSize this query should only read committed data
 */
public StateQueryRequest<R> enableReadCommitted() 

Behavior changes

If StateStore#transactional() returns true, then the store performs writes via the implementation-specific transactional mechanism. Reads via ReadOnlyKeyValueStore methods return uncommitted data from the ongoing transaction.

A transactional state store opens the first transaction during initialization. It commits on StateStore#flush - first, the store commits the transaction, then flushes, then starts a new transaction. 

There are several places where StreamTask, ProcessorStateManager, and TaskManager check if EOS is enabled, and then it deletes the checkpoint file on crash failure, specifically, when:

  • StreamTask resumes processing (link)
  • ProcessorStateManager initializes state stores from offsets from checkpoint (link1, link2)
  • StreamTask writes offsets to the checkpoint file on after committing (link)
  • TaskManager handles revocation (link)

If EOS is enabled, we will remove offset information for non-transactional state stores from the checkpoint file instead of just deleting the file.

Interactive Queries (IQ)

We introduce new readCommitted parameters for IQv1 and IQv2 that control whether queries return committed or uncommitted results if the underlying state store is transactional. If the underlying store is not transactional, then IQs return the most recently written data regardless of the same readCommitted value. 

If readCommitted is false or the store is not transactional, interactive queries work exactly the same as we do now. If readCommitted is true and the state store is transactional, the query reads directly from the store ignoring records in RecordCache and the current uncommitted transaction.

RocksDB

When RocksDBTransactionalMechanism=IndexBatches, Kafka Streams will make the writes to the built-in RocksDB state store transactional by using  WriteBatchWithIndex, which is similar to WriteBatch already used segment stores, except it also allows reading uncommitted data.

The advantage of this approach is that it uses the RocksDB built-in mechanism to ensure transactionality and offers the smallest possible write amplification overhead.

The disadvantage of this approach is that all uncommitted writes must fit into memory. In practice, RocksDB developers recommend the batches to be no larger than 3-4 megabytes (link).

A considered alternative is OptimisticTransactionDB, which offers more guarantees than Kafka Streams needs, specifically - ensures that there were no write conflicts between concurrent transactions before committing. There are no concurrent transactions in Kafka Streams, so there is no reason to pay for the associated overhead.

Compatibility, Deprecation, and Migration Plan

Transactional state stores will be disabled by default. Both Streams DSL and Processor API users can enable transactional writes in the built-in RocksDB state store by passing a new boolean flag transactional=true to Materialized constructor and Stores factory methods. Custom state stores will have an option to enable transactionality by adjusting their implementation according to the contract StateStore#transactional() contract.

Proposed changes are source compatible and binary incompatible with previous releases.

Test Plan

  1. Changes not committed to the changelog topic are discarded on crash failure.
  2. Changes committed to the changelog topic, but not committed to the state store are rolled forward.

Rejected Alternatives

Transactions via Secondary State Store for Uncommitted Changes

In this alternative, Kafka Streams opens two stores instead of one - a temporary store and a regular store. All uncommitted writes go to the temporary store. Once the task flushes, the temporary store creates a commit file, an empty file that indicates that the corresponding file is ready to commit, and writes stored records to the regular store. It truncates the store and deletes the commit file once it is finished. 

On crash failure, the task checks if the temporary store and the commit file exist. If they do, the task repeats the commit process described above. Otherwise, it truncates the temporary store.

The major advantage of this approach is that the temporary state store can optionally use the available disk space if the writes do not fit into the in-memory buffer. 

The disadvantages are:

  • It doubles the number of open state stores 
  • It potentially has higher write and read amplification due to uncontrolled flushes of the temporary state store.

For completeness, below are the details on the changes required to support this implementation.

Interface changes

StoreSupplier:

...

TransactionalKeyValueByteStore:

  • A constructor accepts a DB and a store supplier for the tmp store.
  • On init, the state store configures depending on the state of the feature flag. If it is set to false, then TransactionalKeyValueByteStore just forwards all methods to inner. Otherwise, it creates a temporary store and implements behavior changes described below.

Behavior changes

All writes and deletes go to the temporary store. Reads query the temporary store and if the data is missing, query the regular store. Range reads query both stores and return a KeyValueIterator that merges the results.

...

   size of the windows (cannot be negative)
 * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
 *                              caching and means that null values will be ignored.
 * @return an instance of {@link WindowBytesStoreSupplier}
 * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
 * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
 */      
 public static WindowBytesStoreSupplier persistentTransactionalTimestampedWindowStore(final String name,
                                                                                      final Duration retentionPeriod,
                                                                                      final Duration windowSize,
                                                                                      final boolean retainDuplicates) throws IllegalArgumentException 

/**
 * Create a persistent transactional {@link SessionBytesStoreSupplier}.
 *
 * @param name              name of the store (cannot be {@code null})
 * @param retentionPeriod   length of time to retain data in the store (cannot be negative)
 *                          (note that the retention period must be at least as long enough to
 *                          contain the inactivity gap of the session and the entire grace period.)
 * @return an instance of a {@link  SessionBytesStoreSupplier}
 */
public static SessionBytesStoreSupplier persistentTransactionalSessionStore(final String name,
                                                               		 	    final Duration retentionPeriod,
                                                              				final boolean transactional)

Materialized

Code Block
public Materialized<K, V, S> withTransactionalityEnabled();

public Materialized<K, V, S> withTransactionalityDisabled();

public StoreSupplier<S> storeSupplier();

public boolean transactional();

StoreQueryParameters

Code Block
languagejava
/**
 * Enable reading only committed data
 * @return StoreQueryParameters a new {@code StoreQueryParameters} instance configured with reading only committed data
 */
public StoreQueryParameters<T> enableReadCommitted() 

StateQueryRequest

Code Block
/**
 * Specifies that this query should only read committed data
 */
public StateQueryRequest<R> enableReadCommitted() 

Compatibility, Deprecation, and Migration Plan

Transactional state stores will be disabled by default. Both Streams DSL and Processor API users can enable transactional writes in the built-in RocksDB state store by passing a new boolean flag transactional=true to Materialized constructor and Stores factory methods. Custom state stores will have an option to enable transactionality by adjusting their implementation according to the contract StateStore#transactional() contract.

Proposed changes are source compatible and binary incompatible with previous releases.

Test Plan

  1. Changes not committed to the changelog topic are discarded on crash failure.
  2. Changes committed to the changelog topic, but not committed to the state store are rolled forward.

Rejected Alternatives

RocksDB in-memory Indexed Batches

A considered alternative is to make built-in RocksDB state store transactional by using  WriteBatchWithIndex, which is similar to WriteBatch already used segment stores, except it also allows reading uncommitted data.

The advantage of this approach is that it uses the RocksDB built-in mechanism to ensure transactionality and offers the smallest possible write amplification overhead.

The disadvantage of this approach is that all uncommitted writes must fit into memory. In practice, RocksDB developers recommend the batches to be no larger than 3-4 megabytes (link) which might be an issue 

RocksDB Optimistic Transactions

Another considered alternative is OptimisticTransactionDB. This alternative suffers from the same issues as in-memory indexed batches, but also has greater overhead. It offers more guarantees than Kafka Streams needs, specifically - ensures that there were no write conflicts between concurrent transactions before committing. There are no concurrent transactions in Kafka Streams, so there is no reason to pay for the associated overhead. 

Transactions via Secondary State Store for Uncommitted Changes

In this alternative, Kafka Streams opens two stores instead of one - a temporary store and a regular store. All uncommitted writes go to the temporary store. Once the task flushes, the temporary store creates a commit file, an empty file that indicates that the corresponding file is ready to commit, and writes stored records to the regular store. It truncates the store and deletes the commit file once it is finished. 

On crash failure, the task checks if the temporary store and the commit file exist. If they do, the task repeats the commit process described above. Otherwise, it truncates the temporary store.

The major advantage of this approach is that the temporary state store can optionally use the available disk space if the writes do not fit into the in-memory buffer. 

The disadvantages are:

  • It doubles the number of open state stores 
  • It potentially has higher write and read amplification due to uncontrolled flushes of the temporary state store

...

  • .

Method to control transaction lifecycle in StateStore

...