Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

NameDefaultDescription
statestore.uncommitted.max.records-1Maximum number of uncommitted state-store records to buffer per-task. If this limit is exceeded, a task commit will be requested. No limit: -1.
statestore.uncommitted.max.bytes67108864 (64 MB)

Maximum number of memory bytes to be used to buffer uncommitted state-store records per-task. If this limit is exceeded, a task commit will be requested. No limit: -1.

Note: if this is too high or unbounded, it's possible for RocksDB to trigger out-of-memory errors.

Changed:

  • org.apache.kafka.streams.processor.StateStore
  • org.apache.kafka.streams.processor.StateStoreContext

Changes:

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
    /**
     * ReturnCreates ana approximatenew count{@link ofTransaction} records not yet committed for reading/writing to this state StateStorestore.
     * <p>
     * ThisState methodstores willthat returndo annot approximationsupport oftransactions thewill numberreturn of{@code records that would be committed by the next call to
     * {@link #commit(Map)}this} instead, which should be considered a
     * transaction that doesn't provide any isolation or atomicity guarantees.
     * <p>
     * If{@link thisTransaction StateStoreTransactions} isare unable to approximately count uncommitted records, it will return {@code -1}.<em>not thread-safe</em>, and should not be shared among threads. New
     * threads should Ifuse this StateStoremethod doesto notcreate supporta atomicnew transactionstransaction, instead of itsharing willan return {@code 0}, because records will always beexisting one.
     * <p>
     * immediatelyTransactions writtencrated toby athis non-transactional store, so there will be none awaiting amethod should have the same {@link IsolationLevel} as the {@link #commit(Map)StateStoreContext}.
     *
 that this store was  * @return The approximate number of records awaiting {@link #commit(Map)}, {@code -1} if the number of
     *         uncommitted records can't be counted, or {@code 0} if this StateStore does not support transactions{@link #init(StateStoreContext, StateStore) initialized with}.
     * <p>
     * To avoid resource leaks, it is recommended to extend {@link AbstractTransactionalStore}, which will track open
     * transactions and automatically close all open transactions when the store is closed.
     */
    @Evolving
 * @return A defaultnew long approximateNumUncommittedEntries() {
        return 0;{@link Transaction} to isolate reads/writes to this {@link StateStore}. The Transaction
    }

 *  /**
     * Return an<strong>MUST</strong> approximatebe count of memory used by records not yet committed to this StateStore.
{@link Transaction#flush() committed} or {@link Transaction#close() closed}
     *     * <p>
   when you *are Thisfinished methodwith willit, returnto anprevent approximationresource ofleaks.
 the memory would be freed by the next call to {@link #commit(Map)}.  */
    @Evolving
    default StateStore newTransaction() {
     * <p>
  return this;
   * If}


 this StateStore is unable/**
 to approximately count uncommitted memory* usage,Return itan willapproximate return {@code -1}count of records not yet committed to this StateStore.
     * If<p>
 this StateStore does not support* atomic transactions, itThis method will return {@code 0}, because an approximation of the number of records willthat alwayswould be
 committed by the next *call immediatelyto
 written to a non-transactional store, so there will be none awaiting a * {@link #commit(Map)}.
     * <p>
     * @return The approximate size of all records awaiting {@link #commit(Map)}, If this StateStore is unable to approximately count uncommitted records, it will return {@code -1}.
 if the size of uncommitted
     * If this StateStore does not support atomic transactions, recordsit can't be counted, orwill return {@code 0}, ifbecause thisrecords StateStorewill does not support transactions.always be
     */
 immediately written to @Evolving
a non-transactional store, so defaultthere long approximateNumUncommittedBytes() {
        return 0;
    }

Proposed Changes

will be none awaiting a {@link #commit(Map)}.
     *
     * @return The approximate number of records awaiting {@link #commit(Map)}, {@code -1} if the number of
     *         uncommitted records can't be counted, or {@code 0} if this StateStore does not support transactions.
     */
    @Evolving
    default long approximateNumUncommittedEntries() {
        return 0;
    }

   /**
     * Return an approximate count of memory used by records not yet committed to this StateStore.
     * <p>
     * This method will return an approximation of the memory would be freed by the next call to {@link #commit(Map)}.
     * <p>
     * If this StateStore is unable to approximately count uncommitted memory usage, it will return {@code -1}.
     * If this StateStore does not support atomic transactions, it will return {@code 0}, because records will always be
     * immediately written to a non-transactional store, so there will be none awaiting a {@link #commit(Map)}.
     *
     * @return The approximate size of all records awaiting {@link #commit(Map)}, {@code -1} if the size of uncommitted
     *         records can't be counted, or {@code 0} if this StateStore does not support transactions.
     */
    @Evolving
    default long approximateNumUncommittedBytes() {
        return 0;
    }


Code Block
languagejava
titleorg.apache.kafka.streams.processor.StateStoreContext
    /**
     * Returns the {@link IsolationLevel} that every {@link Transaction} created by {@link StateStore#newTransaction()}
     * should use.
     * <p>
     * The default implementation of this method will use {@link IsolationLevel#READ_COMMITTED READ_COMMITTED} if the
     * app is {@link #appConfigs() configured} to use an {@link StreamsConfig#EXACTLY_ONCE_V2 exactly-once} {@link
     * StreamsConfig#PROCESSING_GUARANTEE_CONFIG processing guarantee}. Otherwise, it will be {@link
     * IsolationLevel#READ_UNCOMMITTED READ_UNCOMMITTED}.
     *
     * @return The isolation level for every transaction created by state stores for this context.
     */
    default IsolationLevel isolationLevel() {
        return StreamsConfigUtils.eosEnabled(new StreamsConfig(appConfigs())) ?
                IsolationLevel.READ_COMMITTED : IsolationLevel.READ_UNCOMMITTED;
    }

New Interfaces

  • org.apache.kafka.streams.state.Transaction
  • org.apache.kafka.streams.state.AbstractTransactionalStore


Code Block
languagejava
titleorg.apache.kafka.streams.state.Transaction
/**
 * Represents a read/write transaction on a state store.
 * <p>
 * For compatibility, transactions implement the entire {@link StateStore} interface, however, they only represent a
 * <em>view</em> on an underlying {@link StateStore}; therefore methods that explicitly act on the entire store, such
 * as {@link #init} will do nothing, and others like {@link #flush()} and {@link #close()} will act upon this
 * transaction only.
 * <p>
 * Transactions are <em>NOT</em> thread-safe, and they should not be shared among multiple threads. Threads should
 * create their own {@link StateStore#newTransaction() new transaction}, which will ensure proper isolation of
 * concurrent changes.
 * <p>
 * For resource-safety, Transactions implement the {@link AutoCloseable} interface, enabling try-with-resources:
 * <pre>
 *     try (final Transaction transaction = store.newTransaction()) {
 *         transaction.put("foo", "bar");
 *         transaction.flush();
 *     }
 * </pre>
 * If you are not using try-with-resources, you <em>must</em> call either {@link #flush()} exactly-once, or {@link
 * #close()} at least once, or risk possible resource leaks.
 */
public interface Transaction extends StateStore, AutoCloseable {

    /**
     * The {@link IsolationLevel} that reads and writes in this transaction are subject to.
     * <p>
     * In the context of a {@link org.apache.kafka.streams.processor.StateStore} transaction, these Isolation Levels
     * adhere to the <a href="https://en.wikipedia.org/wiki/Isolation_(database_systems)#Read_phenomena">ANSI SQL 92
     * definitions</a>.
     * <p>
     * All isolation levels guarantee "read-your-own-writes", i.e. that writes in the transaction will be seen by
     * subsequent reads <em>from within the same transaction</em>. Other guarantees vary by isolation level:
     * <p>
     * <table>
     *     <tr>
     *         <th>Isolation Level</th>
     *         <th>Description</th>
     *         <th>Permitted Read Phenomena</th>
     *     </tr>
     *     <tr>
     *         <td>{@link IsolationLevel#READ_UNCOMMITTED}</td> // todo: ALOS
     *         <td>Allows queries to read writes from all ongoing transactions that have not-yet been committed.</td>
     *         <td>dirty reads, non-repeatable reads, phantom reads</td>
     *     </tr>
     *     <tr>
     *         <td>{@link IsolationLevel#READ_COMMITTED}</td> // todo: EOS
     *         <td>Allows queries to only read writes that have been committed to the StateStore. Writes by an ongoing
     *         transaction are not visible <em>until that transaction commits</em>.</td>
     *         <td>non-repeatable reads, phantom reads</td>
     *     </tr>
     * </table>
     * <p>
     * Under {@link IsolationLevel#READ_UNCOMMITTED}, there are no guarantees on when records from other transactions
	 * become visible, therefore implementations <em>may</em> refrain from making uncommitted records visible to other
     * transactions until they're committed, if doing so would improve performance.
     */
    IsolationLevel isolationLevel();

    /**
     * Initializes this Transaction.
     * <p>
     * Most transactions require no explicit initialization, therefore the default implementation of this method does
     * nothing.
     *
     * @deprecated Since 2.7.0, the parent method has been deprecated in favor of {@link
     *             #init(StateStoreContext, StateStore)}. Use that method instead.
     */
    @Override @Deprecated
    default void init(final ProcessorContext context, final StateStore root) {
        init((StateStoreContext) context, root);
    }

    /**
     * Initializes this Transaction.
     * <p>
     * Most transactions require no explicit initialization, therefore the default implementation of this method does
     * nothing.
     */
    @Override
    default void init(final StateStoreContext context, final StateStore root) {
        // do nothing?
    }

    /**
     * Is this transaction is open for reading and writing.
     * @return {@code true} if this Transaction can be read from/written to, or {@code false} if it is no longer
     *         accessible.
     */
    @Override
    boolean isOpen();

    /**
     * Commit and close this Transaction.
     * <p>
     * Any records held by this Transaction will be written to the underlying state store.
     * <p>
     * Once this method returns successfully, this Transaction will no longer be available to use for reads/writes.
     *
     * @see #close() to close the Transaction without writing data to the underlying state store.
     */
    @Override
    void flush();

    /**
     * Closes this Transaction, without committing records.
     * <p>
     * Any uncommitted records will <em>not</em> be written to the underlying state store, and will instead be
     * discarded.
     * <p>
     * This method should be used to "rollback" a Transaction that should not be {@link #flush() committed}.
     * <p>
     * The underlying {@link StateStore} will <em>not</em> be closed by this method.
     * <p>
     * This method is idempotent: repeatedly calling {@code close()} will not produce an error.
     *
     * @see #flush() to close this Transaction by writing its records to the underlying state store.
     */
    @Override
    void close();
}

New Base Implementations

  • org.apache.kafka.streams.state.AbstractTransaction
  • org.apache.kafka.streams.state.AbstractTransactionalStore


Code Block
languagejava
titleorg.apache.kafka.streams.state.AbstractTransaction
/**
 * Base implementation of {@link Transaction transactions} created by an implementation of {@link
 * AbstractTransactionalStore}.
 * <p>
 * This base implementation provides the following functionality:
 * <ul>
 *     <li>Registration of callbacks that are invoked after this Transaction has closed. Used by {@link
 *     AbstractTransactionalStore} to track open transactions that need to be closed when the parent store closes.</li>
 *     <li>Tracking/delegation of {@link #isOpen()}, {@link #persistent()}, {@link #name()} and {@link
 *     #isolationLevel()} methods.</li>
 *     <li>Provides a {@link #validateIsOpen()} method, useful for ensuring the Transaction state in query methods.</li>
 *     <li>Provides base implementations of {@link #flush()} and {@link #close()} that properly handle idempotence and
 *     invoking any registered callbacks.</li>
 * </ul>
 * <p>Note: none of the above methods are marked {@code final}, to enable power-users to provide alternatives. However,
 * this should only be done with great care, and only if absolutely necessary.</p>

 *
 * @param <S> The type of the {@link StateStore} that spawned this transaction.
 */
public abstract class AbstractTransaction<S extends StateStore> implements Transaction {

    /**
     * Commit and close this Transaction.
     * 
     * @see Transaction#flush()
     */
    public abstract void commitTransaction();

    /**
	 * Closes this Transaction, without committed records.
     *
     * @see Transaction#close()
     */
    public abstract void closeTransaction();
}


Code Block
languagejava
titleorg.apache.kafka.streams.state.AbstractTransactionalStore
/**
 * Base class for transactional stores, that tracks open transactions and can close them all.
 * <p>
 * Transactions created using {@link #newTransaction()} will be automatically tracked and closed when this store is
 * closed.
 *
 * @see Transaction
 * @see AbstractTransaction
 */
public abstract class AbstractTransactionalStore implements StateStore {

    /**
     * Creates a new {@link Transaction}.
     * <p>
     * Implementations of this class should implement this method, instead of {@link #newTransaction()}, which
     * will call this method to produce the new transaction.
     * <p>
     * Transactions produced by this method must be an instance of {@link AbstractTransaction}, to enable the
     * automatic management of transaction resources.
     * 
     * @return A new {@link Transaction}.
     */
    public abstract AbstractTransaction<? extends AbstractTransactionalStore> createTransaction();
}

Proposed Changes

To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit. To achieve this, we introduce the concept of a Transaction, which is a view of the underlying state store, and buffers writes until flush() is called. Transactions implement the StateStore interface, for compatibility, so they can be used anywhere an existing StateStore is expected.

Internally, we enable configuration of the level of isolation provided by StateStores via a context-wide IsolationLevel, which can be configured to either:

IsolationLevelDescriptionprocessing.mode
READ_UNCOMMITTEDRecords written to any transaction are visible to all other transactions immediately. This level provides no atomicity, consistency, isolation or durability guarantees.at-least-once
READ_COMMITTEDRecords written to one transaction are only visible by other transactions once they have been committed.exactly-once, exactly-once-beta, exactly-once-v2

StateStore Transactions are only committed once the Kafka transaction for their changelog entries have been committed. This ensures that, under exactly-once, the records written to disk always reflect the state of the store's changelog.

The at-least-once processing mode does not need the isolation guarantees that transactions provide, and indeed, doing so could cause problems as the default commit.interval.ms under at-least-once is 30 seconds, which would add a latency of up to 30 seconds for records to become visible to Interactive Queries. For this reason, under at-least-once, Streams will automatically configure all StateStores to use the READ_UNCOMMITTED isolation level, which provides no isolation guarantees, and ensures that records are immediately available for query.

RocksDB Transactions

When the isolation level is READ_COMMITTED, we will use RocksDB's WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. When reading records, we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query. The performance of this is expected to actually be better than the existing, non-batched write path. The main performance concern RocksDB provides WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. The performance overhead of doing this should be negligible. The main performance consideration is that the buffer must reside completely in-memory until it is committed.

To mitigate this, we will automatically force a Task commit if the total uncommitted records returned by StateStore#approximateNumUncommittedEntries()  exceeds a threshold, configured by statestore.uncommitted.max.records; or the total memory used for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes() exceeds the threshold configured by statestore.uncommitted.max.bytes. This will roughly bound the memory required per-Task for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failure.

Note that these new methods provide default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.

When reading records, we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query.

Concurrent Access by Interactive Queries

While the Task that owns a StateStore is processed by a single thread, concurrent access is possible when other threads perform an interactive query on the StateStore. While RocksDB itself is thread-safe, and optimized for concurrent access, WriteBatchWithIndex is not thread-safe. Care will need to be taken to ensure the consistency and safety of the batch in the presence of concurrent access from interactive query threads.

.

Transaction Management

A wrapper TransactionalKeyValueStore will be provided, which takes care of automatically creating a new transaction when it's initialized, and starting a new transaction after the existing one has been flushed. This is the principle way that transactions will be managed by the Streams engine. A similar analogue will be created to automatically manage Segment transactions.

Interactive queries will be modified to automatically call newTransaction() and use that in-place of the StateStore being queried.

The AbstractTransaction and AbstractTransactionalStore classes detailed above provide a base implementation for transactionality that will automatically track and manage open transactions, to ensure that they are properly closed when their StateStore closes, ensuring no resource leaks. These are the base classes for RocksDBStore and RocksDBTransaction, respectivelyAll RocksDBStore access methods (get, put , etc.) are already synchronized, however, it's possible for iterators to escape the synchronization locking, enabling a RocksDBIterator to be in-use while the underlying WriteBatchWithIndex is being modified. In addition to tackling this problem, we will evaluate whether the access methods need to be syncrhonized, as this may be an unnecessary performance penalty.

Compatibility, Deprecation, and Migration Plan

...