Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Right now, a stream processor with EOS has to delete all data from the local state stores after crash failure because the state stores might be in a partially updated state. The partial update of a state store can happen during a crash failure because the changes to the local state are not atomic with respect to Kafka Streams commit. If an application with EOS crashes between commits, it cannot reset the state to the previously committed, so it wipes the state stores and replays the changelog from scratch.

This KIP proposes making writes to the state stores transactional so that they atomically commit only after the corresponding changes are committed to the changelog topic. As a result, Streams applications configured with EOS will no longer need to wipe the state stores on crash failure.

Public Interfaces

Changed:

  • org.apache.kafka.streams.processor.StateStore
  • org.apache.kafka.streams.state.StoreSupplier

  • org.apache.kafka.streams.state.Stores

  • org.apache.kafka.streams.kstream.Materialized

  • org.apache.kafka.streams.kstream.StreamJoined

Proposed Changes

Background

This section briefly describes relevant parts of a stateful task's write lifecycle with EOS.

...

In step 4, the task does not flush and checkpoint in the case of EOS to prevent partial update on the crash failure. Without the risk of partial update, EOS could follow the same process as other processing semantics and avoid replaying the changelog.

Overview

This section introduces an overview of the proposed changes. The following sections will cover the changes in behavior, configuration, and interfaces in detail.

...

There are multiple ways to implement state store transactions that present different trade-offs. This proposal includes a single implementation via a secondary RocksDB for uncommitted writes. 

StateStore changes

This section covers multiple changes to the state store interfaces.  This proposal replaces StateStore#flush with 2 new methods - StateStore#commit(Long) and StateStore#recover(Long) and adds a boolean transactional() method to determine if a state store is transactional.

...

To sum up, StateStore#recover transitions the state store to the consistent, committed state and returns the changelog corresponding to that state. Given the Streams workloads properties described above and the AK transaction commit order, the returned offset is always either the offset in the checkpoint file or the offset committed to the changelog.

Behavior changes

If StateStore#transactional() returns true, then the store performs writes via the implementation-specific transactional mechanism. Reads via ReadOnlyKeyValueStore methods return uncommitted data from the ongoing transaction.

...

If EOS is enabled, we will remove offset information for non-transactional state stores from the checkpoint file instead of just deleting the file.

Transactions via Secondary State Store for Uncommitted Changes

New Stores# factory methods described above provide an option to create transactional state stores. Transactionality is guaranteed by batching uncommitted (dirty) writes in a temporary RocksDB instance. Once the task flushes, the temporary store creates a commit file with a changelog offset, indicating that the transaction is ready to commit, and writes dirty records into the regular store. It truncates the temporary store and deletes the commit file once it is finished. 

...

  • It doubles the number of open state stores 
  • It potentially has higher write and read amplification due to uncontrolled flushes of the temporary state store.

Interface Changes

StateStore

Code Block
languagejava
titleStateStore.java
/**
* Return true the storage supports transactions.
*
* @return {@code true} if the storage supports transactions, {@code false} otherwise
*/
default boolean transactional() {
   return false;
}

/**
 * Flush any cached data
 *
 */
@Deprecated
default void flush() {}

/**
  * Flush and commit any cached data
  * <p>
  * For transactional state store commit applies all changes atomically. In other words, either the
  * entire commit will be successful or none of the changes will be applied.
  * <p>
  * For non-transactional state store this method flushes cached data.
  *
  * @param changelogOffset the offset of the changelog topic this commit corresponds to. The
  *                        offset can be null if the state store does not have a changelog
  *                        (e.g. a global store).
  * @code null}
  */
default void commit(final Long changelogOffset) {
    if (transactional()) {
      throw new UnsupportedOperationException("Transactional state store must implement StateStore#commit");
    } else {
      flush();
    }
}

/**
  * Recover a transactional state store
  * <p>
  * If a transactional state store shut down with a crash failure, this method can either
  * roll back or forward uncommitted changes. In any case, this method returns the changelog
  * offset it rolls to.
  *
  * @param changelogOffset the checkpointed changelog offset.
  * @return the changelog offset after recovery or {@code null} if recovery failed.
  */     
default Long recover(final Long changelogOffset) {
    if (transactional()) {
        throw new UnsupportedOperationException("Transactional state store must implement StateStore#recover");
    }
        return changelogOffset;
}

...

Code Block
public StoreSupplier<S> storeSupplier();

public boolean transactional();

Compatibility, Deprecation, and Migration Plan

Transactional state stores will be disabled by default. Both Streams DSL and Processor API users can enable transactional writes in the built-in RocksDB state store by passing a new boolean flag transactional=true to Materialized constructor and Stores factory methods. Custom state stores will have an option to enable transactionality by adjusting their implementation according to the StateStore#transactional() contract.

...

Proposed changes are source compatible and binary incompatible with previous releases.

Test Plan

  1. Changes not committed to the changelog topic are discarded on crash failure.
  2. Changes committed to the changelog topic, but not committed to the state store are rolled forward.

Rejected Alternatives

RocksDB in-memory Indexed Batches

A considered alternative is to make built-in RocksDB state store transactional by using  WriteBatchWithIndex, which is similar to WriteBatch already used segment stores, except it also allows reading uncommitted data.

...

The disadvantage of this approach is that all uncommitted writes must fit into memory. In practice, RocksDB developers recommend the batches to be no larger than 3-4 megabytes (link) which might be an issue 

RocksDB Optimistic Transactions

Another considered alternative is OptimisticTransactionDB. This alternative suffers from the same issues as in-memory indexed batches, but also has greater overhead. It offers more guarantees than Kafka Streams needs, specifically - ensures that there were no write conflicts between concurrent transactions before committing. There are no concurrent transactions in Kafka Streams, so there is no reason to pay for the associated overhead. 

Method to control transaction lifecycle in StateStore

A considered alternative is to introduce methods like StateStore#beginTxn and StateStore#commitTxn to manage transactions lifecycle. I don’t think they are necessary due to stream workloads specifics - there is always a single transaction for a given task and that transaction commits only after the commit to the changelog. Moreover, explicit method calls to begin and commit a transaction introduce possible invalid states, like skipping beginTxn before committing, beginning a transaction multiple times, committing after flushing, etc.

...