Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP introduces persistent transactional state stores that guarantee atomic commit, meaning that either all uncommitted (dirty) writes will be applied together or none will. The proposal changes public APIs -  StateStore and StoreSupplier interfaces and Stores factory. StateStore and StoreSupplier have a new transactional() method that returns true if the state store is transactional. In addition, StateStore#flush method is replaced by StateStore#commit(Long) paired with StateStore#recover(Longlong), allowing to commit the current state at a specified offset and recovering from the crash failure to a previously committed offset. Users can create persistent transactional state stores via the Stores factory.

...

This section covers multiple changes to the state store interfaces.  This proposal replaces StateStore#flush with 2 new methods - StateStore#commit(Long) and StateStore#recover(Longlong) and adds a boolean transactional() method to determine if a state store is transactional.

...

In this sense, flush and commit are semantically the same thing. The separation of concerns between these 2 methods will be ambiguous and it is unclear what is the correct call order.

Now, why do we need an explicit recover method? Both StateStore#commit and StateStore#recover have a Long changelogOffset parameter. This parameter aims to allow state stores to identify commits by a commit token. A commit token allows The purpose of StateStore#recover(long changelogOffset) method is to transition the state store to distinguish between committed state versions, allowing different possible implementations for the transactional commit. StateStore#recover method accepts the checkpointed offset and can do 3 possible things:

  1. Do nothing and return the checkpointed offset if the StateStore is in a committed state.
  2. Rollback current dirty changes if, e.g., the state store crashed before committing and return the checkpointed offset.
  3. Roll forward and return the new committed offset if the state store crashed after the commit has started, but before it finished.

To sum up, StateStore#recover transitions the state store to the consistent, committed state and returns the changelog corresponding to that state. Given the Streams workloads properties described above and the AK transaction commit order, the returned offset is always either the offset in the checkpoint file or the offset committed to the changeloga consistent state after crash failure. This method discards any changes that are not yet committed to the changelog topic and ensures that its state corresponds to the offset that is greater than or equal to the checkpointed changelogOffset.

Behavior changes

If StateStore#transactional() returns true, then the store performs writes via the implementation-specific transactional mechanism. Reads via ReadOnlyKeyValueStore methods return uncommitted data from the ongoing transaction.

...

Transactions via Secondary State Store for Uncommitted Changes

New This proposal comes with a reference implementation used in the Stores# factory methods described above provide an option used to create transactional state stores. Transactionality In this implementation, transactionality is guaranteed by batching uncommitted (dirty) writes in a temporary RocksDB instance. Once the task flushes, the temporary store creates a commit file with a changelog offset, indicating that the transaction is ready to commit, and writes dirty records into the regular store. It truncates the temporary store and deletes the commit file once it is finished. 

...

Code Block
languagejava
titleStateStore.java
/**
* Return true the storage supports transactions.
*
* @return {@code true} if the storage supports transactions, {@code false} otherwise
*/
default boolean transactional() {
   return false;
}

/**
 * Flush any cached data
 *
 */
@Deprecated
default void flush() {}

/**
  * Flush and commit any cached data
  * <p>
  * For transactional state store commit applies all changes atomically. In other words, either the
  * entire commit will be successful or none of the changes will be applied.
  * <p>
  * For non-transactional state store this method flushes cached data.
  *
  * @param changelogOffset the offset of the changelog topic this commit corresponds to. The
  *                        offset can be null if the state store does not have a changelog
  *                        (e.g. a global store).
  * @code null}
  */
default void commit(final Long changelogOffset) {
    if (transactional()) {
      throw new UnsupportedOperationException("Transactional state store must implement StateStore#commit");
    } else {
      flush();
    }
}      

/**
  * Recover aRecovers transactionalthe state store after crash failure.
  * <p>
  * IfThe astate transactionalstore staterecovers storeby shutdiscarding downany withwrites athat crashare failure,not thiscommitted methodto canthe eitherchangelog
  * rolland backrolling orto forwardthe uncommittedstate changes.that Incorresponds anyto case, this method returns the changelog
 {@code changelogOffset} or greater offset of
 * offsetthe itchangelog rolls totopic.
  *
  * @param changelogOffset the checkpointed changelog offset.
  * @return {@code thetrue} changelogif offsetthe afterstate recoverystore orrecovered, {@code nullfalse} if recovery failedotherwise.
  */     
default Longboolean recover(final Longlong changelogOffset) {
    if (transactional()) {
        throw new UnsupportedOperationException("Transactional state store must implement StateStore#recover");
    }
        return changelogOffsetfalse;
}

StoreSupplier

Code Block
languagejava
/**
 * Return true if a call to {@link StoreSupplier#get} returns a transactional state
 * store.
 *
 * @return {@code true} if a call to {@link StoreSupplier#get} returns a transactional state
 * store, {@code false} otherwise.
*/
boolean transactional();

...

StateStore#flush() method is deprecated. New StateStore#commit(offsetchangelogOffset) method will by default fall back to StateStoreNon StateStore#flush() for non-transactional state stores it will call the new .

Proposed changes are source compatible and binary incompatible with previous releases.

...