Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleStateStore.java
/**
* Return true the storage supports transactions.
*
* @return {@code true} if the storage supports transactions, {@code false} otherwise
*/
default boolean transactional() {
   return false;
}       

/**
  * Flush and commit any cached data
  * <p>
  * For transactional state store commit applies all changes atomically. In other words, either the
  * entire commit will be successful or none of the changes will be applied.
  * <p>
  * For non-transactional state store this method flushes cached data.
  *
  * @param changelogOffset the offset of the changelog topic this commit corresponds to. The
  *                        offset can be null if the state store does not have a changelog
  *                        (e.g. a global store).
  * @code null}
  */
default void commit(final Long changelogOffset);

/** {
  * Recover aif (transactional state store()) {
  * <p>
  * Ifthrow a transactionalnew UnsupportedOperationException("Transactional state store shutmust down with a crash failure, this method can eitherimplement StateStore#commit");
    } else {
  * roll back or forwardflush();
 uncommitted changes. In any case, this method returns the changelog }
}

/**
  * Recover a transactional state store
  * offset it rolls to.<p>
  *
 If *a @paramtransactional changelogOffsetstate thestore checkpointedshut changelogdown offset.
with a *crash @returnfailure, thethis changelogmethod offset after recovery.can either
  */
Long recover(final Long changelogOffset)  roll back or forward uncommitted changes. In any case, this method returns the changelog
  * offset it rolls to.
  *
  * @param changelogOffset the checkpointed changelog offset.
  * @return the changelog offset after recovery or {@code null} if recovery failed.
  */     
default Long recover(final Long changelogOffset) {
    if (transactional()) {
        throw new UnsupportedOperationException("Transactional state store must implement StateStore#recover");
    }
        return changelogOffset;
}

StoreSupplier

Code Block
languagejava
/**
 * Return true if a call to {@link StoreSupplier#get} returns a transactional state
 * store.
 *
 * @return {@code true} if a call to {@link StoreSupplier#get} returns a transactional state
 * store, {@code false} otherwise.
*/
boolean transactional();

...

Code Block
languagejava
/**
 * Create a persistent transactional {@link KeyValueBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a {@link #keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link TimestampedKeyValueStore} you should use
 * {@link #persistentTransactionalTimestampedKeyValueStore(String)} to create a store supplier instead.
 *
 * @param name  name of the store (cannot be {@code null})
 * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
 * to build a persistent key-value store
 */
public static KeyValueBytesStoreSupplier persistentTransactionalKeyValueStore(final String name) 

/**
 * Create a persistent transactional {@link KeyValueBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a
 * {@link #timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link KeyValueStore} you should use
 * {@link #persistentKeyValueStore(String)} to create a store supplier instead.
 *
 * @param name  name of the store (cannot be {@code null})
 * @return an instance of a {@link KeyValueBytesStoreSupplier} that can be used
 * to build a persistent key-(timestamp/value) store
 */
public static KeyValueBytesStoreSupplier persistentTransactionalTimestampedKeyValueStore(final String name)

  /**
 * Create a persistent transactional {@link WindowBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a {@link #windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link TimestampedWindowStore} you should use
 * {@link #persistentTimestampedWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead.
 *
 * @param name                  name of the store (cannot be {@code null})
 * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
 *                              (note that the retention period must be at least long enough to contain the
 *                              windowed data's entire life cycle, from window-start through window-end,
 *                              and for the entire grace period)
 * @param windowSize            size of the windows (cannot be negative)
 * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
 *                              caching and means that null values will be ignored.
 * @return an instance of {@link WindowBytesStoreSupplier}
 * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
 * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
 */ 
 public static WindowBytesStoreSupplier persistentTransactionalWindowStore(final String name,
                                                                           final Duration retentionPeriod,
                                                                           final Duration windowSize,
                                                                           final boolean retainDuplicates) throws IllegalArgumentException  

/**
 * Create a persistent transactional {@link WindowBytesStoreSupplier}.
 * <p>
 * This store supplier can be passed into a
 * {@link #timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde)}.
 * If you want to create a {@link WindowStore} you should use
 * {@link #persistentWindowStore(String, Duration, Duration, boolean, boolean)} to create a store supplier instead.
 *
 * @param name                  name of the store (cannot be {@code null})
 * @param retentionPeriod       length of time to retain data in the store (cannot be negative)
 *                              (note that the retention period must be at least long enough to contain the
 *                              windowed data's entire life cycle, from window-start through window-end,
 *                              and for the entire grace period)
 * @param windowSize            size of the windows (cannot be negative)
 * @param retainDuplicates      whether or not to retain duplicates. Turning this on will automatically disable
 *                              caching and means that null values will be ignored.
 * @return an instance of {@link WindowBytesStoreSupplier}
 * @throws IllegalArgumentException if {@code retentionPeriod} or {@code windowSize} can't be represented as {@code long milliseconds}
 * @throws IllegalArgumentException if {@code retentionPeriod} is smaller than {@code windowSize}
 */      
 public static WindowBytesStoreSupplier persistentTransactionalTimestampedWindowStore(final String name,
                                                                                      final Duration retentionPeriod,
                                                                                      final Duration windowSize,
                                                                                      final boolean retainDuplicates) throws IllegalArgumentException 

/**
 * Create a persistent transactional {@link SessionBytesStoreSupplier}.
 *
 * @param name              name of the store (cannot be {@code null})
 * @param retentionPeriod   length of time to retain data in the store (cannot be negative)
 *                          (note that the retention period must be at least as long enough to
 *                          contain the inactivity gap of the session and the entire grace period.)
 * @return an instance of a {@link  SessionBytesStoreSupplier}
 */
public static SessionBytesStoreSupplier persistentTransactionalSessionStore(final String name,
                                                               		 	    final Duration retentionPeriod)

Materialized

Code Block
public Materialized<K, V, S> withTransactionalityEnabled();

public Materialized<K, V, S> withTransactionalityDisabled();

public StoreSupplier<S> storeSupplier();

public boolean transactional();

...

Transactional state stores will be disabled by default. Both Streams DSL and Processor API users can enable transactional writes in the built-in RocksDB state store by passing a new boolean flag transactional=true to Materialized constructor and Stores factory methods. Custom state stores will have an option to enable transactionality by adjusting their implementation according to the contract  StateStore#transactional() contract.

StateStore#flush() method is deprecated. New StateStore#commit(offset) method will fall back to StateStoreNon-transactional state stores it will call the new 

Proposed changes are source compatible and binary incompatible with previous releases.

...