Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There are multiple ways to implement state store transactions that present different trade-offs. This proposal includes a single reference implementation via a secondary RocksDB for uncommitted writes.  In addition, it provides a RocksDBTransactionalMechanism enum to control the specific implementation used in Stores# and Materialized APIs.

StateStore changes

This section covers multiple changes to the state store interfaces.  This proposal replaces StateStore#flush with 2 new methods - StateStore#commit(Long) and StateStore#recover(long) and adds a boolean transactional() method to determine if a state store is transactional.

...

Code Block
languagejava
/**
 * Return true if a call to {@link StoreSupplier#get} returns a transactional state
 * store.
 *
 * @return {@code true} if a call to {@link StoreSupplier#get} returns a transactional state
 * store, {@code false} otherwise.
*/
boolean transactional();

RocksDBTransactionalMechanism

Code Block
languagejava
public enum RocksDBTransactionalMechanism {
    // RocksDB will use a secondary store for the uncommitted writes
    SECONDARY_STORE,
}


Stores

Code Block
languagejava
/**
  * Create a persistent transactional {@link KeyValueBytesStoreSupplier}.
  * <p>
  * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
  * If you want to create a transactional {@link TimestampedKeyValueStore} you should use
  * {@link #persistentTransactionalTimestampedKeyValueStore(String, RocksDBTransactionalMechanism)} to create a store supplier instead.
  *
  * @param name  name of the store (cannot be {@code null})
  * @param txnMechanism the transactional mechanism to use for the store (cannot be {@code null})
  * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
  * to build a persistent transactional key-value store
  */
public static KeyValueBytesStoreSupplier persistentTransactionalKeyValueStore(final String name) 

/**
 * Create a persistent transactional {@link KeyValueBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a
 * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link KeyValueStore} you should use
 * {@link #persistentKeyValueStore(String)} to create a store supplier instead.
 *
 * @param name  name of the store (cannot be {@code null})
 * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
 * to build a persistent key-(timestamp/value) store
 */
public static KeyValueBytesStoreSupplier persistentTransactionalTimestampedKeyValueStore(final String name)

  /**
 ,
                                                                              final RocksDBTransactionalMechanism txnMechanism)

/**
  * Create a persistent transactional {@link WindowBytesStoreSupplierKeyValueBytesStoreSupplier}.
  * <p>
  * This store supplier can be passed into a {@link #windowStoreBuilder#keyValueStoreBuilder(WindowBytesStoreSupplierKeyValueBytesStoreSupplier, Serde, Serde)}.
  * If you want to create a transactional {@link TimestampedWindowStoreTimestampedKeyValueStore} you should use
  * {@link #persistentTimestampedWindowStore#persistentTransactionalTimestampedKeyValueStore(String, Duration, Duration, boolean, booleanRocksDBTransactionalMechanism)} to create a store supplier instead.
  *
  * @param name  name of the store (cannot be {@code null})
  * @param txnMechanism the transactional mechanism to nameuse offor the store (cannot be {@code null})
  * @param@return retentionPeriodan instance of a {@link KeyValueBytesStoreSupplier} that lengthcan ofbe timeused
 to retain* datato inbuild thea storepersistent (cannot be negative)transactional key-value store
  */
public static KeyValueBytesStoreSupplier persistentTransactionalKeyValueStore(final String name,
                         (note that the retention period must be at least long enough to contain the
 *                              windowed data's entire life cycle, from window-start through window-end,
 *final RocksDBTransactionalMechanism txnMechanism)

/**
  * Create a persistent transactional {@link KeyValueBytesStoreSupplier}.
  * <p>
  * This store supplier can be passed into a
  * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
 and for* theIf entireyou gracewant period)
to *create @parama windowSize{@link KeyValueStore} you should use
  * {@link #persistentTransactionalKeyValueStore(String)} to create a sizestore ofsupplier theinstead.
 windows (cannot*
 be negative)
 * @param retainDuplicatesname  name of the store whether(cannot orbe not to retain duplicates. Turning this on will automatically disable
 *                              caching and means that null values will be ignored.
 * @return an instance of {@link WindowBytesStoreSupplier}
 * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
 * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
 */ 
 public static WindowBytesStoreSupplier persistentTransactionalWindowStore(final String name,
                                                       {@code null})
  * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
  * to build a persistent transactional key-(timestamp/value) store
  */
public static KeyValueBytesStoreSupplier persistentTransactionalTimestampedKeyValueStore(final String name)

/**
  * Create a persistent transactional {@link KeyValueBytesStoreSupplier}.
  * <p>
  * This store supplier can be passed into a
  * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
  * If you want to create a {@link KeyValueStore} you should use
  * {@link #persistentTransactionalKeyValueStore(String, RocksDBTransactionalMechanism)} to create a store supplier instead.
  *
  * @param name  name of the store (cannot be {@code null})
  * @param txnMechanism the transactional mechanism to use for the store (cannot be {@code null})
  * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
  * to build a persistent key-(timestamp/value) store
 */
public static KeyValueBytesStoreSupplier persistentTransactionalTimestampedKeyValueStore(final String name,
                    final Duration retentionPeriod,
                                                                   final RocksDBTransactionalMechanism   txnMechanism)    final Duration windowSize,
                                                                           final boolean retainDuplicates) throws IllegalArgumentException  


/**
  * Create a persistent transactional {@link WindowBytesStoreSupplier}.
  * <p>
  * This store supplier can be passed into a
 * {@link #timestampedWindowStoreBuilder#windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
  * If you want to create a {@link WindowStoreTimestampedWindowStore} you should use
  * {@link #persistentWindowStore#persistentTransactionalTimestampedWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead.
  *
  * @param name                  name of the store (cannot be {@code null})
  * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
  *                              (note that the retention period must be at least long enough to contain the
  *                              windowed data's entire life cycle, from window-start through window-end,
  *                              and for the entire grace period)
  * @param windowSize            size of the windows (cannot be negative)
  * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
  *                              caching and means that null values will be ignored.
  * @return an instance of {@link WindowBytesStoreSupplier}
  * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
  * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
  */      
 public static WindowBytesStoreSupplier persistentTransactionalTimestampedWindowStorepersistentTransactionalWindowStore(final String name,
                                                                          final Duration retentionPeriod,
          final Duration retentionPeriod,
                                                              final Duration windowSize,
                      final Duration windowSize,
                                                  final boolean retainDuplicates) throws IllegalArgumentException                                final boolean retainDuplicates) throws IllegalArgumentException 

/**
 * Create a persistent transactional {@link SessionBytesStoreSupplier}.
 *
 * @param name              name of the store (cannot be {@code null})
 * @param retentionPeriod         length of time to retain data in the store (cannot be negative)
 *   

/**
  * Create a persistent transactional {@link WindowBytesStoreSupplier}.
  * <p>
  * This store supplier can be passed into a {@link #windowStoreBuilder(noteWindowBytesStoreSupplier, that the retention period must be at least as long enough to
 *           Serde, Serde)}.
  * If you want to create a {@link TimestampedWindowStore} you should use
  * {@link #persistentTransactionalTimestampedWindowStore(String, Duration, Duration, boolean, RocksDBTransactionalMechanism)} to create a store supplier instead.
  *
  * @param name               contain the inactivity gapname of the sessionstore and(cannot thebe entire grace period.{@code null})
  * @return@param anretentionPeriod instance of a {@link  SessionBytesStoreSupplier}
 */
public static SessionBytesStoreSupplier persistentTransactionalSessionStore(final String name,
   length of time to retain data in the store (cannot be negative)
  *                              (note that the retention period must be at least long enough to contain the
  *                         		 	    finalwindowed Duration retentionPeriod)

Materialized

data's entire life cycle, from window-start through window-end,
  *                              and for the entire grace period)
  * @param windowSize            size of the windows (cannot be negative)
  * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
  *                              caching and means that null values will be ignored.
  * @param txnMechanism          the transactional mechanism to use for the store (cannot be {@code null})
  * @return an instance of {@link WindowBytesStoreSupplier}
  * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
  * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
  */
public static WindowBytesStoreSupplier persistentTransactionalWindowStore(final String name,
                                                                          final Duration retentionPeriod,
                                                                          final Duration windowSize,
                                                                          final boolean retainDuplicates,
                                                                          final RocksDBTransactionalMechanism txnMechanism) throws IllegalArgumentException

/**
  * Create a persistent transactional {@link WindowBytesStoreSupplier}.
  * <p>
  * This store supplier can be passed into a
  * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
  * If you want to create a {@link WindowStore} you should use
  * {@link #persistentTransactionalWindowStore(String, Duration, Duration, boolean)} to create a store supplier instead.
  *
  * @param name                  name of the store (cannot be {@code null})
  * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
  *                              (note that the retention period must be at least long enough to contain the
  *                              windowed data's entire life cycle, from window-start through window-end,
  *                              and for the entire grace period)
  * @param windowSize            size of the windows (cannot be negative)
  * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
  *                              caching and means that null values will be ignored.
  * @return an instance of {@link WindowBytesStoreSupplier}
  * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
  * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
  */
public static WindowBytesStoreSupplier persistentTransactionalTimestampedWindowStore(final String name,
                                                                                     final Duration retentionPeriod,
                                                                                     final Duration windowSize,
                                                                                     final boolean retainDuplicates) throws IllegalArgumentException                                                                          

/**
  * Create a persistent transactional {@link WindowBytesStoreSupplier}.
  * <p>
  * This store supplier can be passed into a
  * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
  * If you want to create a {@link WindowStore} you should use
  * {@link #persistentTransactionalWindowStore(String, Duration, Duration, boolean, RocksDBTransactionalMechanism)} to create a store supplier instead.
  *
  * @param name                  name of the store (cannot be {@code null})
  * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
  *                              (note that the retention period must be at least long enough to contain the
  *                              windowed data's entire life cycle, from window-start through window-end,
  *                              and for the entire grace period)
  * @param windowSize            size of the windows (cannot be negative)
  * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
  *                              caching and means that null values will be ignored.
  * @param txnMechanism          the transactional mechanism to use for the store (cannot be {@code null})
  * @return an instance of {@link WindowBytesStoreSupplier}
  * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
  * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
  */
public static WindowBytesStoreSupplier persistentTransactionalTimestampedWindowStore(final String name,
                                                                                     final Duration retentionPeriod,
                                                                                     final Duration windowSize,
                                                                                     final boolean retainDuplicates,
                                                                                     final RocksDBTransactionalMechanism txnMechanism) throws IllegalArgumentException                                                                                     

/**
  * Create a persistent transactional {@link SessionBytesStoreSupplier}.
  *
  * @param name              name of the store (cannot be {@code null})
  * @param retentionPeriod   length of time to retain data in the store (cannot be negative)
  *                          (note that the retention period must be at least as long enough to
  *                          contain the inactivity gap of the session and the entire grace period.)
  * @return an instance of a {@link  SessionBytesStoreSupplier}
  */
public static SessionBytesStoreSupplier persistentTransactionalSessionStore(final String name,
                                                                            final Duration retentionPeriod)                                                                                     

/**
  * Create a persistent transactional {@link SessionBytesStoreSupplier}.
  *
  * @param name              name of the store (cannot be {@code null})
  * @param retentionPeriod   length of time to retain data in the store (cannot be negative)
  *                          (note that the retention period must be at least as long enough to
  *                          contain the inactivity gap of the session and the entire grace period.)
  * @param txnMechanism      the transactional mechanism to use for the store (cannot be {@code null})
  * @return an instance of a {@link  SessionBytesStoreSupplier}
  */
public static SessionBytesStoreSupplier persistentTransactionalSessionStore(final String name,
                                                                            final Duration retentionPeriod,
                                                                            final RocksDBTransactionalMechanism txnMechanism)

Materialized

Code Block
public StoreSupplier<S> storeSupplier();

public boolean transactional();

/**
 * Enable transactionality of the materialized {@link StateStore}.
 *
 * @return itself
 * @throws IllegalArgumentException if store supplier is also pre-configured
 */
public Materialized<K, V, S> withTransactionsEnabled()

/**
  * Disable transactionality of the materialized {@link StateStore}.
  *
  * @return itself
  * @throws IllegalArgumentException if store supplier is also pre-configured
  */
public Materialized<K, V, S> withTransactionsDisabled()
Code Block
public StoreSupplier<S> storeSupplier();

public boolean transactional();

Compatibility, Deprecation, and Migration Plan

...

  • It doubles the number of open state stores 
  • It potentially has higher write and read amplification due to uncontrolled flushes of the temporary state store.

Rejected Alternatives

RocksDB in-memory Indexed Batches

...