You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 28 Next »

Status

Current state: Under Discussion

Discussion thread: Thread

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As described in KIP-844, under EOS, crash failures cause all Task state to be wiped out on restart. This is because, currently, data is written to the StateStore before the commit to its changelog has completed, so it's possible that records are written to disk that were not committed to the store changelog.

In KIP-844, it was proposed to create an alternative type of StateStore, which would enable users to opt-in to "transactional" behaviour, that ensured data was only persisted once the changelog commit has succeeded. However, the design and approach outlined in KIP-844 unfortunately did not perform well when tested (with a write throughput that was approximately only 4% of the regular RocksDB StateStore!).

This KIP explores an alternative design that should have little/no performance impact, potentially performing better than the status quo, and can thus be enabled for all stores.

Public Interfaces

New configuration

NameDefaultDescription
statestore.uncommitted.max.bytes67108864 (64 MB)

Maximum number of memory bytes to be used to buffer uncommitted state-store records. If this limit is exceeded, a task commit will be requested. No limit: -1.

Note: if this is too high or unbounded, it's possible for RocksDB to trigger out-of-memory errors.

Changed Interfaces

  • org.apache.kafka.streams.processor.StateStore
  • org.apache.kafka.streams.processor.StateStoreContext

Changes:

org.apache.kafka.streams.processor.StateStore
    /**
     * Creates a new transaction for reading/writing to this state store.
     * <p>
     * State stores that do not support transactions will return {@code this} instead, which should be considered a
     * transaction that doesn't provide any isolation or atomicity guarantees.
     * <p>
     * Transactions are <em>not thread-safe</em>, and should not be shared among threads. New threads should use this
     * method to create a new transaction, instead of sharing an existing one.
     * <p>
     * Transactions crated by this method should have the same {@link IsolationLevel} as the {@link StateStoreContext}
     * that this store was {@link #init(StateStoreContext, StateStore) initialized with}.
     * <p>
     * To avoid resource leaks, it is recommended to extend {@link AbstractTransactionalStore}, which will track open
     * transactions and automatically close all open transactions when the store is closed.
     *
     * @return A new transaction to isolate reads/writes to this {@link StateStore}. The Transaction
     *         <strong>MUST</strong> be {@link #flush() flushed} or {@link #close() closed} when you are finished
	 *         with it, to prevent resource leaks.
     */
    @Evolving
    default StateStore newTransaction() {
        return this;
    }

   /**
     * Return an approximate count of memory used by records not yet committed to this StateStore.
     * <p>
     * This method will return an approximation of the memory would be freed by the next call to {@link #flush()}.
     * <p>
     * If this StateStore is unable to approximately count uncommitted memory usage, it will return {@code -1}.
     * If this StateStore does not support atomic transactions, it will return {@code 0}, because records will always be
     * immediately written to a non-transactional store, so there will be none awaiting a {@link #flush()}.
     *
     * @return The approximate size of all records awaiting {@link #flush()}, {@code -1} if the size of uncommitted
     *         records can't be counted, or {@code 0} if this StateStore does not support transactions.
     */
    @Evolving
    default long approximateNumUncommittedBytes() {
        return 0;
    }


org.apache.kafka.streams.processor.StateStoreContext
    /**
     * The {@link IsolationLevel} that every transaction created by {@link StateStore#newTransaction()} should use.
     * <p>
     * In the context of a {@link org.apache.kafka.streams.processor.StateStore} transaction, these Isolation Levels
     * adhere to the <a href="https://en.wikipedia.org/wiki/Isolation_(database_systems)#Read_phenomena">ANSI SQL 92
     * definitions</a>.
     * <p>
     * All isolation levels guarantee "read-your-own-writes", i.e. that writes in the transaction will be seen by
     * subsequent reads <em>from within the same transaction</em>. Other guarantees vary by isolation level:
     * <p>
     * <table>
     *     <tr>
     *         <th>Isolation Level</th>
     *         <th>Description</th>
     *         <th>Permitted Read Phenomena</th>
     *     </tr>
     *     <tr>
     *         <td>{@link IsolationLevel#READ_UNCOMMITTED}</td> // todo: ALOS
     *         <td>Allows queries to read writes from all ongoing transactions that have not-yet been committed.</td>
     *         <td>dirty reads, non-repeatable reads, phantom reads</td>
     *     </tr>
     *     <tr>
     *         <td>{@link IsolationLevel#READ_COMMITTED}</td> // todo: EOS
     *         <td>Allows queries to only read writes that have been committed to the StateStore. Writes by an ongoing
     *         transaction are not visible <em>until that transaction commits</em>.</td>
     *         <td>non-repeatable reads, phantom reads</td>
     *     </tr>
     * </table>
     * <p>
     * Under {@link IsolationLevel#READ_UNCOMMITTED}, there are no guarantees on when records from other transactions
     * become visible, therefore implementations <em>may</em> refrain from making uncommitted records visible to other
     * transactions until they're committed, if doing so would improve performance.
     * <p>
     * The default implementation of this method will use {@link IsolationLevel#READ_COMMITTED READ_COMMITTED} if the
     * app is {@link #appConfigs() configured} to use an {@link StreamsConfig#EXACTLY_ONCE_V2 exactly-once} {@link
     * StreamsConfig#PROCESSING_GUARANTEE_CONFIG processing guarantee}. Otherwise, it will be {@link
     * IsolationLevel#READ_UNCOMMITTED READ_UNCOMMITTED}. 
     */
    default IsolationLevel isolationLevel() {
        return StreamsConfigUtils.eosEnabled(new StreamsConfig(appConfigs())) ?
                IsolationLevel.READ_COMMITTED : IsolationLevel.READ_UNCOMMITTED;
    }

New Interfaces

  • org.apache.kafka.streams.state.AbstractTransactionalStore


New Base Implementations

  • org.apache.kafka.streams.state.AbstractTransaction
  • org.apache.kafka.streams.state.AbstractTransactionalStore


org.apache.kafka.streams.state.AbstractTransaction
/**
 * Base implementation for transactions created by an implementation of {@link AbstractTransactionalStore}.
 * <p>
 * This base implementation provides the following functionality:
 * <ul>
 *     <li>Registration of callbacks that are invoked after this transaction has closed. Used by {@link
 *     AbstractTransactionalStore} to track open transactions that need to be closed when the parent store closes.</li>
 *     <li>Tracking/delegation of {@link #isOpen()}, {@link #persistent()}, {@link #name()} and {@link
 *     #isolationLevel()} methods.</li>
 *     <li>Provides a {@link #validateIsOpen()} method, useful for ensuring the transaction state in query methods.</li>
 *     <li>Provides base implementations of {@link #flush()} and {@link #close()} that properly handle idempotence and
 *     invoking any registered callbacks.</li>
 * </ul>
 * <p>Note: none of the above methods are marked {@code final}, to enable power-users to provide alternatives. However,
 * this should only be done with great care, and only if absolutely necessary.</p>
 * <p>
 * For resource-safety, AbstractTransactions implement the {@link AutoCloseable} interface, enabling try-with-resources:
 * <pre>
 *     try (final AbstractTransaction<MyStore> transaction = (AbstractTransaction<MyStore>) store.newTransaction()) {
 *         transaction.put("foo", "bar");
 *         transaction.flush();
 *     }
 * </pre>
 * If you are not using try-with-resources, you <em>must</em> call either {@link #flush()} exactly-once, or {@link
 * #close()} at least once, or risk possible resource leaks.
 *
 * @see AbstractTransactionalStore
 * @param <S> The type of the {@link StateStore} that spawned this transaction.
 */
public abstract class AbstractTransaction<S extends StateStore> implements StateStore, AutoCloseable {

    /**
     * Commit and close this Transaction.
     * 
     * @see StateStore#flush()
     */
    public abstract void commitTransaction();

    /**
	 * Closes this Transaction, without committing records.
     *
     * @see StateStore#close()
     */
    public abstract void closeTransaction();
}
org.apache.kafka.streams.state.AbstractTransactionalStore
/**
 * Base class for transactional stores, that tracks open transactions and can close them all.
 * <p>
 * Transactions created using {@link #newTransaction()} will be automatically tracked and closed when this store is
 * closed.
 *
 * @see AbstractTransaction
 */
public abstract class AbstractTransactionalStore implements StateStore {

    /**
     * Creates a new transaction.
     * <p>
     * Implementations of this class should implement this method, instead of {@link #newTransaction()}, which
     * will call this method to produce the new transaction.
     * <p>
     * Transactions produced by this method must be an instance of {@link AbstractTransaction}, to enable the
     * automatic management of transaction resources.
     * 
     * @return A new {@link Transaction}.
     */
    public abstract AbstractTransaction<? extends AbstractTransactionalStore> createTransaction();
}

Proposed Changes

To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit. To achieve this, we introduce the concept of a Transaction, which is a view of the underlying state store, and buffers writes until flush() is called. Transactions implement the StateStore interface, for compatibility, so they can be used anywhere an existing StateStore is expected.

Internally, we enable configuration of the level of isolation provided by StateStores via a context-wide IsolationLevel, which can be configured to either:

IsolationLevelDescriptionprocessing.mode
READ_UNCOMMITTEDRecords written to any transaction are visible to all other transactions immediately. This level provides no atomicity, consistency, isolation or durability guarantees.at-least-once
READ_COMMITTEDRecords written to one transaction are only visible by other transactions once they have been committed.exactly-once, exactly-once-beta, exactly-once-v2

StateStore transactions are only committed once the Kafka transaction for their changelog entries have been committed. This ensures that, under exactly-once, the records written to disk always reflect the state of the store's changelog.

The at-least-once processing mode does not need the isolation guarantees that transactions provide, and indeed, doing so could cause problems as the default commit.interval.ms under at-least-once is 30 seconds, which would require buffering records for up to 30 seconds, unnecessarily. For this reason, under at-least-once, Streams will automatically configure all StateStores to use the READ_UNCOMMITTED isolation level, which provides no isolation guarantees, and ensures that records are immediately written to disk.

RocksDB Transactions

When the isolation level is READ_COMMITTED, we will use RocksDB's WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. When reading records, we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query. The performance of this is expected to actually be better than the existing, non-batched write path. The main performance concern is that the buffer must reside completely in-memory until it is committed.

To mitigate this, we will automatically force a Task commit if the total memory used for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes() exceeds the threshold configured by statestore.uncommitted.max.bytes. This will roughly bound the memory required for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failure. Each StreamThread will be given 1/num.stream.threads of the configured limits, dividing it fairly between them.

It's possible that some Topologies can generate many more new StateStore entries than the records they process, in which case, it would be possible for such a Topology to cross the configured record/memory thresholds mid-processing, potentially causing an OOM error if these thresholds are exceeded by a lot. To mitigate this, the StreamThread will measure the increase in records/bytes written on each iteration, and pre-emptively commit if the next iteration is likely to cross a threshold.

Note that these new methods provide default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.

Transaction Management

A wrapper TransactionalKeyValueStore will be provided, which takes care of automatically creating a new transaction when it's initialized, and starting a new transaction after the existing one has been committed. This is the principle way that transactions will be managed by the Streams engine. A wrapper TransactionalSegment will also be provided, which extends TransactionalKeyValueStore, providing automatic transaction handling for Segments.

The AbstractTransaction and AbstractTransactionalStore classes detailed above provide a base implementation for transactionality that will automatically track and manage open transactions, to ensure that they are properly closed when their StateStore closes, ensuring no resource leaks. These are the base classes for RocksDBStore and RocksDBTransaction, respectively.

Interactive Queries

Interactive queries currently see every record, as soon as they are written to a StateStore. This can cause some consistency issues, as interactive queries can read records before they're committed to the Kafka changelog, which may be rolled-back. To address this, interactive queries will query the underlying StateStore, and will not be routed through a Transaction. This ensures that interactive queries see a consistent view of the store, as they will not be able to read any uncommitted records.

When operating under IsolationLevel.READ_COMMITTED (i.e. EOS), the maximum time for records to become visible to interactive queries will be commit.interval.ms, however, this is automatically set to a low value, and it's not recommended for users to use high intervals when operating under EOS.

When operating under IsolationLevel.READ_UNCOMMITTED, (i.e. ALOS), all records will be immediately visible to interactive queries, so the high default commit.interval.ms of 30s will have no impact on interactive query latency.

Error Handling

Kafka Streams currently generates a TaskCorruptedException when a Task needs to have its state wiped (under EOS) and be re-initialized. There are currently several different situations that generate this exception:

  1. No offsets for the store can be found when opening it under EOS.
  2. OutOfRangeException during restoration, usually caused by the changelog being wiped on application reset.
  3. TimeoutException under EOS, when writing to or committing a Kafka transaction.

The first two of these are extremely rare, and make sense to keep. However, timeouts are much more frequent. They currently require the store to be wiped under EOS because when a timeout occurs, the data in the local StateStore will have been written, but the data in the Kafka changelog will have failed to be written, causing a mismatch in consistency.

With Transactional StateStores, we can guarantee that the local state is consistent with the changelog, therefore, it will no longer be necessary to reset the local state on a TimeoutException.

Compatibility, Deprecation, and Migration Plan

The above changes will retain compatibility for all existing StateStores, including user-defined custom implementations. Any StateStore that extends RocksDBStore will automatically inherit its behaviour, although users that directly write via the db RocksDB  instance may need to switch to using the dbAccessor to ensure consistent results.

All new methods on existing classes will have defaults set to ensure compatibility.

Test Plan

Testing will be accomplished by both the existing tests and by writing some new unit tests that verify atomicity, durability and consistency guarantees that this KIP provides.

Rejected Alternatives

The design outlined in KIP-844, sadly, does not perform well (as described above), and requires users to opt-in to transactionality, instead of being a guarantee provided out-of-the-box.

Atomic Checkpointing

Kafka Streams currently stores the changelog offsets for a StateStore in a per-Task on-disk file, .checkpoint, which under EOS, is written only when Streams shuts down successfully. There are two major problems with this approach:

  • If this file is only written on a clean exit, we won't know the offsets our StateStore contains, and therefore will need to wipe and restore on any error, nullifying many of the improvements gained by Transactional State Stores.
  • If we write to this file on every successful commit, there's a race condition, where it's possible the application exits after data has been committed to RocksDB, but before the checkpoint file has been updated, causing a consistency violation.

To resolve this, we move the responsibility for offset management to the StateStore itself. The new commit method takes a map of all the changelog offsets that correspond to the state of the Transaction being committed.

RocksDBStore will store these offsets in a separate Column Family, and will be configured to atomically flush all its Column Families. This guarantees that the changelog offsets will always be flushed to disk together with the data they represent, irrespective of how that flush is triggered. This allows us to remove the explicit memtable flush(), enabling RocksDB to dictate when memtables are flushed to disk. This will likely improve performance and reduce compactions, as it will no longer be required to flush new SST files every 10,000 records.

The existing .checkpoint files will be retained for any StateStore that does not set managesOffsets()  to true , and the existing offsets will be automatically migrated into StateStores that manage their own offsets, iff there is no offset returned by StateStore#getCommittedOffset.

Offsets for Consumer Rebalances

During Consumer rebalance, Streams directly checks all on-disk .checkpoint files, including those for Tasks/StateStores that are not currently "open". This is done to ensure that Tasks are assigned to the instance with the most complete local state. With Atomic Checkpointing, any StateStore that manages its own offsets (i.e. managesOffsets() is true) will first need to be opened in order to read its offsets, and then closed again.

The additional latency this would introduce to the rebalance is unacceptable. Instead, we will continue to write the offsets of all StateStores, including those that manage their own offsets, to the .checkpoint file. For StateStores that manage their own offsets, this file will only be read if that store is closed (i.e. not yet initialized). Since the store is not open, the above race condition does not apply, making this safe. If a StateStore returns true for both  managesOffsets  and isOpen , then the store will be queried for its offsets directly, via getCommittedOffset.


  • No labels