Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here

JIRA: here

...

  • org.apache.kafka.streams.processor.StateStore
  • org.apache.kafka.streams.state.StoreSupplierorg.apache.kafka.streams.state.Stores

  • org.apache.kafka.streams.kstream.Materializedorg.apache.kafka.streams.kstream.StreamJoined.StoreType

Proposed Changes

Background

...

  1. The task (StreamTask, StandbyTask) registers its state stores. State stores load offset metadata from the checkpoint file (link). That step aims to establish a mapping between data in the state store and the offset of the changelog topic.
    1. In case of crash failure, if the state store has data, but the checkpoint file does not exist. , ProcessorStateManager throws an exception in that case for EOS tasks. This is an indicator to throw away local data and replay the changelog topic (link).
  2. The task processes data and writes its state locally.
  3. The task commits EOS transaction. TaskExecutor#commitOffsetsOrTransaction calls StreamsProducer#commitTransaction that sends new offsets and commits the transaction.
  4. The task runs a postCommit method (StreamTask, StandbyTask) that:
    1. flushes the state stores and
    2. updates the checkpoint file (link) for non-EOS tasks (link).
  5. The Go to step 2 until task shuts down. It During shutdown, the task stops processing data, then writes its current offset to the checkpoint file and halts.

...

  • The crash happens between steps 1 and 3. The uncommitted data will be discarded. The input records were not committed via the EOS transaction, so the task will re-process them.
  • The crash happens between 3 and 4a. The EOS transaction has been already committed, but the state store hasn't. The state store will replay the uncommitted records from the changelog topic.
  • The crash happens between 4a and 4b. The state store has already committed the new records, but they are not yet reflected in the changelog topiccheckpoint file. The state store will replay the last committed records from the changelog topic. This operation is idempotent and does not violate correctness.
  • The crash happens after step 4b. She The state store does nothing during recovery.

There are multiple ways to implement state store transactions that present different trade-offs. This proposal includes a single reference implementation via a secondary RocksDB for uncommitted writes. 

StateStore changes

This section covers multiple changes to the state store interfaces.  This proposal replaces StateStore#flush with 2 new methods - StateStore#commit(Long) and StateStore#recover(long) and adds a boolean transactional() method to determine if a state store is transactional.

...

If EOS is enabled, we will remove offset information for non-transactional state stores from the checkpoint file instead of just deleting the file.

...


Configuration changes

StateStore

StreamsConfig

default.dsl.store has a new valid value - txn_rocksDB that enables transactional RocksDB state store.

Interface Changes

StateStore

Code Block
languagejava
titleStateStore.java
/
Code Block
languagejava
titleStateStore.java
/**
* Return true the storage supports transactions.
*
* @return {@code true} if the storage supports transactions, {@code false} otherwise
*/
@Evolving
default boolean transactional() {
   return false;
}

/**
 * Flush any cached data
 *
 */
@Deprecated
default void flush() {}

/**
  * Flush and commit any cached data
  * <p>
  * For transactional state store commit applies all changes atomically. In other words, either the
  * entire commit will be successful or none of the changes will be applied.
  * <p>
  * For non-transactional state store this method flushes cached data.
  *
  * @param changelogOffset the offset of the changelog topic this commit corresponds to. The
  *                        offset can be null if the state store does not have a changelog
  *                        (e.g. a global store).
  * @code null}
  */
@Evolving
default void commit(final Long changelogOffset) {
    if (transactional()) {
      throw new UnsupportedOperationException("Transactional state store must implement StateStore#commit");
    } else {
      flush();
    }
}      

/**
 * Recovers the state store after crash failure.
 * <p>
 * The state store recovers by discarding any writes that are not committed to the changelog
 * and rolling to the state that corresponds to {@code changelogOffset} or greater offset of
 * the changelog topic.
 *
 * @param changelogOffset the checkpointed changelog offset.
 * @return {@code true} if the state store recovered, {@code false} otherwise.
 */
@Evolving
default boolean recover(final long changelogOffset) {
    if (transactional()) {
        throw new UnsupportedOperationException("Transactional state store must implement StateStore#recover");
    }
    return false;
}


StoreSupplierStores

Code Block
languagejava
/**
 * ReturnCreate true if a callpersistent to {@link StoreSupplier#get} returns a transactional stateKeyValueBytesStoreSupplier}.
 * store.<p>
 *
 *This @returnstore {@code true} if a call to {@link StoreSupplier#get} returns a transactional state
 * store, {@code false} otherwise.
*/
boolean transactional();

Stores

Code Block
languagejava
/**
 supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link TimestampedKeyValueStore} you should use
 * {@link #persistentTimestampedKeyValueStore(String)} to create a store supplier instead.
 *
 * @param name  name of the store (cannot be {@code null})
 * @param transactional whether the store should be transactional
 * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
 * to build a persistent key-value store
 */
 public static KeyValueBytesStoreSupplier persistentKeyValueStore(final String name, final boolean transactional)

 /**
  * Create a persistent {@link KeyValueBytesStoreSupplier}.
  * <p>
  * This store supplier can be passed into a
  * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
  * If you want to create a {@link KeyValueStore} you should use
  * {@link #persistentKeyValueStore(String)} to create a store supplier instead.
  *
  * @param name  name of the store (cannot be {@code null})
  * @param transactional whether the store should be transactional
  * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
  * to build a persistent key-(timestamp/value) store
  */
 public static KeyValueBytesStoreSupplier persistentTimestampedKeyValueStore(final String name, final boolean transactional)

 /**
  * Create a persistent transactional {@link KeyValueBytesStoreSupplierWindowBytesStoreSupplier}.
  * <p>
  * This store supplier can be passed into a {@link #keyValueStoreBuilder#windowStoreBuilder(KeyValueBytesStoreSupplierWindowBytesStoreSupplier, Serde, Serde)}.
  * If you want to create a {@link TimestampedKeyValueStoreTimestampedWindowStore} you should use
  * {@link #persistentTransactionalTimestampedKeyValueStore#persistentTimestampedWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead.
 *
 * @param name  name of the store (cannot be {@code null})
 * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
 * to build a persistent key-value store
 */
public static KeyValueBytesStoreSupplier persistentTransactionalKeyValueStore(final String name) 

/**
 * Create a persistent transactional {@link KeyValueBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a
 * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link KeyValueStore} you should use
 * {@link #persistentKeyValueStore(String)} to create a store supplier instead.
 *
 * @param name  name of the store (cannot be {@code null})
 * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
 * to build a persistent key-(timestamp/value) store
 */
public static KeyValueBytesStoreSupplier persistentTransactionalTimestampedKeyValueStore(final String name)

  /**
 * Create a persistent transactional {@link WindowBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link TimestampedWindowStore} you should use
 * {@link #persistentTimestampedWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead.
 *
 * @param name       instead.
  *
  * @param name                  name of the store (cannot be {@code null})
  * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
  *                              (note that the retention period must be at least long enough to contain the
  *                              windowed data's entire life cycle, from window-start through window-end,
  *                              and for the entire grace period)
  * @param windowSize            namesize of the storewindows (cannot be {@code null}negative)
  * @param retentionPeriodretainDuplicates      whether lengthor ofnot time to retain dataduplicates. inTurning thethis storeon (cannotwill beautomatically negative)disable
  *                              (notecaching and means that thenull retentionvalues period mustwill be atignored.
 least long* enough@param totransactional contain the
 *      whether the store should be transactional
  * @return an instance of {@link WindowBytesStoreSupplier}
  * @throws IllegalArgumentException if {@code retentionPeriod} or {@code  windowedwindowSize} datacan'st entirebe liferepresented cycle,as from window-start through window-end,
{@code long milliseconds}
  * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
  */
 public static WindowBytesStoreSupplier persistentWindowStore(final String name,
            and for the entire grace period)
 * @param windowSize            size of the windows (cannot be negative)
 * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
 *      final Duration retentionPeriod,
                      caching and means that null values will be ignored.
 * @return an instance of {@link WindowBytesStoreSupplier}
 * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
 * @throws IllegalArgumentException if {@code retentionPeriod} is smaller thanfinal {@codeDuration windowSize},
 */ 
 public static WindowBytesStoreSupplier persistentTransactionalWindowStore(final String name,
                                                              final boolean retainDuplicates,
                         final Duration retentionPeriod,
                                   final boolean transactional) throws IllegalArgumentException

/**
 * Create a persistent {@link WindowBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link TimestampedWindowStore} you should use
 *  final{@link #persistentTimestampedWindowStore(String, Duration, windowSizeDuration,
 boolean)} to create a store supplier instead.
 *
 * @param name                  name of the store (cannot be {@code null})
 * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
 *                 final boolean retainDuplicates) throws IllegalArgumentException  

/**
 * Create a persistent transactional {@link WindowBytesStoreSupplier}.
 * <p>
(note *that Thisthe storeretention supplierperiod canmust be passedat intoleast a
long * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.enough to contain the
 * If    you want to create a {@link WindowStore} you should use
 * {@link #persistentWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead.
 *
 * @paramwindowed namedata's entire life cycle, from window-start through window-end,
 *          name of the store (cannot be {@code null})
 * @param retentionPeriod       length of time to retain data inand for the storeentire (cannot be negativegrace period)
 * @param windowSize            size of the windows (cannot be negative)
 * @param retainDuplicates      whether (noteor thatnot theto retentionretain period must be at least long enough to contain theduplicates. Turning this on will automatically disable
 *                              windowedcaching data'sand entiremeans lifethat cycle,null fromvalues window-startwill through window-end,be ignored.
 * @param transactional         whether the store should be transactional
 * @return an instance of {@link WindowBytesStoreSupplier}
 * @throws IllegalArgumentException    and for the entire grace period)
 * @param windowSize            size of the windows (cannot be negative)
 * @param retainDuplicates if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
 * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
 */
public static WindowBytesStoreSupplier persistentTimestampedWindowStore(final String name,
     whether or not to retain duplicates. Turning this on will automatically disable
 *                              caching and means that null values will be ignored.
 * @return an instance of {@link WindowBytesStoreSupplier}
 * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can'tfinal beDuration representedretentionPeriod,
 as {@code long milliseconds}
 * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
 */      
 public static WindowBytesStoreSupplier persistentTransactionalTimestampedWindowStore(final String name,
                                             final Duration windowSize,
                                       final Duration retentionPeriod,
                               final boolean retainDuplicates,
                                                     final Duration windowSize,
                 final boolean transactional) throws IllegalArgumentException

/**
 * Create a persistent {@link SessionBytesStoreSupplier}.
 *
 * @param name              name of the store (cannot be {@code null})
 * @param retentionPeriod   length of time to retain data in the store (cannot be negative)
 *                final boolean retainDuplicates) throws IllegalArgumentException 

/**
 * Create a persistent transactional(note {@link SessionBytesStoreSupplier}.
 *
 * @param name       that the retention period must be at least as long enough to
 *       name of the store (cannot be {@code null})
 * @param retentionPeriod   length of time to retain data incontain the storeinactivity (cannotgap beof negative)
the *session and the entire grace period.)
 * @param transactional     whether the store should be transactional
 * @return an instance of a {@link (note that the retention period must be at least as long enough to
 *SessionBytesStoreSupplier}
 */
public static SessionBytesStoreSupplier persistentSessionStore(final String name,
                               contain the inactivity gap of the session and the entire grace period.)
 * @return an instance of a {@link  SessionBytesStoreSupplier}
 */
public static SessionBytesStoreSupplier persistentTransactionalSessionStore(final String name,
       final Duration retentionPeriod,
                                                      		 	    final Duration retentionPeriod)

Materialized

Code Block
public StoreSupplier<S> storeSupplier();

public final boolean transactional();) 

Materialized.StoreType

Materialized.StoreType enum has a new value TXN_ROCKS_DB that corresponds to a transactional state store implementation based on RocksDB.

Compatibility, Deprecation, and Migration Plan

...

Proposed changes are source compatible and binary incompatible with previous releases.

Test Plan

  1. Add a variation for all existing stateful tests to run with enabled transactional state stores.
  2. Add tests to ensure that transactional state stores discard uncommitted changes after Changes not committed to the changelog topic are discarded on crash failure.Changes committed to the changelog topic, but not committed to the state store are rolled forward
  3. Add tests to ensure that transactional state stores replay missing changes from the changelog topic on recovery.

Transactions via Secondary State Store for Uncommitted Changes

This proposal comes with a reference implementation used in the Stores# factory methods used to create transactional state stores. In this implementation, transactionality is guaranteed by batching uncommitted (dirty) writes in a temporary RocksDB instance. Once the task flushes, On commit, such state store copies uncommitted writes from the temporary store creates a commit file with a changelog offset, indicating that the transaction is ready to commit, and writes dirty records into the regular store. It the main store, then truncates the temporary store and deletes the commit file once it is finished. stores.

All writes and deletes go to the temporary store. Reads query the temporary store; if the data is missing, query the regular store. Range reads query both stores and return a KeyValueIterator that merges the results. On crash failure, ProcessorStateManager calls StateStore#recover(offset). The state store checks if the temporary store and the commit file exist. If they do, the task repeats the commit process described above (i.e., rolls forward) and returns the new committed offset. Otherwise, it that truncates the temporary store (rolls uncommitted changes back) and returns the previously committed offset.

The major advantage of this approach is that the temporary state store can optionally use the available disk space if the writes do not fit into the in-memory buffer. 

...

  • It doubles the number of open state stores 
  • It potentially has higher write and read amplification due to uncontrolled flushes of the temporary state store.
  • It requires an additional value copy per write to model deletions.

Rejected Alternatives

RocksDB in-memory Indexed Batches

...