Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here

JIRA: here

...

  1. The task (StreamTask, StandbyTask) registers its state stores. State stores load offset metadata from the checkpoint file (link). That step aims to establish a mapping between data in the state store and the offset of the changelog topic.
    1. In case of crash failure, if the state store has data, but the checkpoint file does not exist, ProcessorStateManager throws an exception in that case for EOS tasks. This is an indicator to throw away local data and replay the changelog topic (link).
  2. The task processes data and writes its state locally.
  3. The task commits EOS transaction. TaskExecutor#commitOffsetsOrTransaction calls StreamsProducer#commitTransaction that sends new offsets and commits the transaction.
  4. The task runs a postCommit method (StreamTask, StandbyTask) that:
    1. flushes the state stores and
    2. updates the checkpoint file (link) for non-EOS tasks (link).
  5. The Go to step 2 until task shuts down. It During shutdown, the task stops processing data, then writes its current offset to the checkpoint file and halts.

...

  • The crash happens between steps 1 and 3. The uncommitted data will be discarded. The input records were not committed via the EOS transaction, so the task will re-process them.
  • The crash happens between 3 and 4a. The EOS transaction has been already committed, but the state store hasn't. The state store will replay the uncommitted records from the changelog topic.
  • The crash happens between 4a and 4b. The state store has already committed the new records, but they are not yet reflected in the changelog topiccheckpoint file. The state store will replay the last committed records from the changelog topic. This operation is idempotent and does not violate correctness.
  • The crash happens after step 4b. She The state store does nothing during recovery.

...

Code Block
languagejava
titleStateStore.java
/**
* Return true the storage supports transactions.
*
* @return {@code true} if the storage supports transactions, {@code false} otherwise
*/
@Evolving
default boolean transactional() {
   return false;
}

/**
 * Flush any cached data
 *
 */
@Deprecated
default void flush() {}

/**
  * Flush and commit any cached data
  * <p>
  * For transactional state store commit applies all changes atomically. In other words, either the
  * entire commit will be successful or none of the changes will be applied.
  * <p>
  * For non-transactional state store this method flushes cached data.
  *
  * @param changelogOffset the offset of the changelog topic this commit corresponds to. The
  *                        offset can be null if the state store does not have a changelog
  *                        (e.g. a global store).
  * @code null}
  */
@Evolving
default void commit(final Long changelogOffset) {
    if (transactional()) {
      throw new UnsupportedOperationException("Transactional state store must implement StateStore#commit");
    } else {
      flush();
    }
}      

/**
 * Recovers the state store after crash failure.
 * <p>
 * The state store recovers by discarding any writes that are not committed to the changelog
 * and rolling to the state that corresponds to {@code changelogOffset} or greater offset of
 * the changelog topic.
 *
 * @param changelogOffset the checkpointed changelog offset.
 * @return {@code true} if the state store recovered, {@code false} otherwise.
 */
@Evolving
default boolean recover(final long changelogOffset) {
    if (transactional()) {
        throw new UnsupportedOperationException("Transactional state store must implement StateStore#recover");
    }
    return false;
}

...

Proposed changes are source compatible and binary incompatible with previous releases.

Test Plan

  1. Add a variation for all existing stateful tests to run with enabled transactional state stores.
  2. Add tests to ensure that transactional state stores discard The state store discards uncommitted changes after crash failure.
  3. The state store replays Add tests to ensure that transactional state stores replay missing changes from the changelog topic on recovery.

Transactions via Secondary State Store for Uncommitted Changes

...