Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
    /**
     * Creates a new transaction for reading/writing Return an approximate count of memory used by records not yet committed to this state storeStateStore.
     * <p>
     * StateThis stores that do not support transactions method will return an {@codeapproximation this}of instead,the whichmemory shouldwould be consideredfreed a
by the next call to * transaction that doesn't provide any isolation or atomicity guarantees{@link #flush()}.
     * <p>
     * TransactionsIf arethis <em>not thread-safe</em>, and should not be shared among threads. New threads should use thisStateStore is unable to approximately count uncommitted memory usage, it will return {@code -1}.
     * methodIf tothis createStateStore adoes newnot transaction,support insteadatomic oftransactions, sharingit anwill existing one.
     * <p>return {@code 0}, because records will always be
     * Transactions crated by this method should have the sameimmediately written to a non-transactional store, so there will be none awaiting a {@link IsolationLevel#flush()}.
 as  the {@link StateStoreContext}*
     * that this store was@return The approximate size of all records awaiting {@link #init#flush(StateStoreContext)}, StateStore) initialized with}.
     * <p>{@code -1} if the size of uncommitted
     * To avoid resource leaks, it is recommended to extend {@link AbstractTransactionalStore}, which will track open
     * transactions and automatically close all open transactions when the store is closed.
     *
     * @return A new transaction to isolate reads/writes to this {@link StateStore}. Therecords can't be counted, or {@code 0} if this StateStore does not support transactions.
     */
    @Evolving
    default long approximateNumUncommittedBytes() {
        return 0;
    }


Code Block
languagejava
titleorg.apache.kafka.streams.processor.StateStoreContext
    /** Transaction
     * The {@link IsolationLevel} that every transaction created by <strong>MUST</strong> be {@link #flushStateStore#newTransaction() flushed} or {@link #close() closed} when you are finished
	 *should use.
     * <p>
     * In the context of witha it, to prevent resource leaks.
     */{@link org.apache.kafka.streams.processor.StateStore} transaction, these Isolation Levels
    @Evolving
 * adhere to defaultthe StateStore newTransaction() {<a href="https://en.wikipedia.org/wiki/Isolation_(database_systems)#Read_phenomena">ANSI SQL 92
     * definitions</a>.
   return this;
 * <p>
  }

   /**
 All isolation levels  * Return an approximate count of memory used by records not yet committed to this StateStore.
     * <p>guarantee "read-your-own-writes", i.e. that writes in the transaction will be seen by
     * subsequent reads <em>from within the same transaction</em>. Other guarantees vary by isolation level:
     * This<p>
 method will return an approximation* of<table>
 the memory would be freed* by the next call to {@link #flush()}. <tr>
     * <p>
     * If this StateStore is unable to approximately count uncommitted memory usage, it will return {@code -1}.<th>Isolation Level</th>
     *         <th>Description</th>
     *    If this StateStore does not support<th>Permitted atomic transactions, it will return {@code 0}, because records will always beRead Phenomena</th>
     *     </tr>
     * immediately written to a non-transactional store, so there will be none awaiting a {@link #flush()}.
     * <tr>
     *         <td>{@link IsolationLevel#READ_UNCOMMITTED}</td> // todo: ALOS
     * @return The approximate size of all records awaiting {@link #flush()}, {@code -1} if the size of uncommitted<td>Allows queries to read writes from all ongoing transactions that have not-yet been committed.</td>
     *         records can't be counted, or {@code 0} if this StateStore does not support transactions.<td>dirty reads, non-repeatable reads, phantom reads</td>
     *     </tr>
     */
     @Evolving<tr>
    default long* approximateNumUncommittedBytes() {
        return 0;<td>{@link IsolationLevel#READ_COMMITTED}</td> // todo: EOS
    }
Code Block
languagejava
titleorg.apache.kafka.streams.processor.StateStoreContext
  *   /**
     * The<td>Allows {@linkqueries IsolationLevel}to thatonly everyread transactionwrites createdthat byhave {@link StateStore#newTransaction()} should use.been committed to the StateStore. Writes by an ongoing
     * <p>
      * In thetransaction contextare ofnot avisible {@link org.apache.kafka.streams.processor.StateStore}<em>until that transaction, these Isolation Levels
 commits</em>.</td>
     *  * adhere to the <a href="https://en.wikipedia.org/wiki/Isolation_(database_systems)#Read_phenomena">ANSI SQL 92   <td>non-repeatable reads, phantom reads</td>
     *     </tr>
     * definitions<</a>.table>
     * <p>
     * AllUnder isolation levels guarantee "read-your-own-writes", i.e. that writes in the transaction will be seen by{@link IsolationLevel#READ_UNCOMMITTED}, there are no guarantees on when records from other transactions
     * subsequentbecome readsvisible, <em>fromtherefore withinimplementations the<em>may</em> same transaction</em>. Other guarantees vary by isolation level:refrain from making uncommitted records visible to other
     * <p>
transactions until they're committed, if *doing <table>
so would improve performance.
  *   *  <tr><p>
     * The default implementation of this method will use <th>Isolation Level</th>
     *{@link IsolationLevel#READ_COMMITTED READ_COMMITTED} if the
     * app is {@link #appConfigs()  <th>Description</th>
     * configured} to use an {@link StreamsConfig#EXACTLY_ONCE_V2 exactly-once} {@link
     * StreamsConfig#PROCESSING_GUARANTEE_CONFIG  <th>Permitted Read Phenomena</th>processing guarantee}. Otherwise, it will be {@link
     *     </tr>IsolationLevel#READ_UNCOMMITTED READ_UNCOMMITTED}. 
     */
    default <tr>
IsolationLevel isolationLevel() {
   *         <td>{@link IsolationLevel#READ_UNCOMMITTED}</td> // todo: ALOSreturn StreamsConfigUtils.eosEnabled(new StreamsConfig(appConfigs())) ?
     *         <td>Allows queries to read writes from all ongoing transactions that have not-yet been committed.</td>
     *         <td>dirty reads, non-repeatable reads, phantom reads</td>
     *     </tr>
     *     <tr>
     *         <td>{@link IsolationLevel#READ_COMMITTED}</td> // todo: EOS
     *         <td>Allows queries to only read writes that have been committed to the StateStore. Writes by an ongoing
     *         transaction are not visible <em>until that transaction commits</em>.</td>
     *         <td>non-repeatable reads, phantom reads</td>
     *     </tr>
     * </table>
     * <p>
     * Under {@link IsolationLevel#READ_UNCOMMITTED}, there are no guarantees on when records from other transactions
     * become visible, therefore implementations <em>may</em> refrain from making uncommitted records visible to other
     * transactions until they're committed, if doing so would improve performance.
     * <p>
     * The default implementation of this method will use {@link IsolationLevel#READ_COMMITTED READ_COMMITTED} if the
     * app is {@link #appConfigs() configured} to use an {@link StreamsConfig#EXACTLY_ONCE_V2 exactly-once} {@link
     * StreamsConfig#PROCESSING_GUARANTEE_CONFIG processing guarantee}. Otherwise, it will be {@link
     * IsolationLevel#READ_UNCOMMITTED READ_UNCOMMITTED}. 
     */
    default IsolationLevel isolationLevel() {
        return StreamsConfigUtils.eosEnabled(new StreamsConfig(appConfigs())) ?
                IsolationLevel.READ_COMMITTED : IsolationLevel.READ_UNCOMMITTED;
    }

New Interfaces

  • org.apache.kafka.streams.state.AbstractTransactionalStore

New Base Implementations

  • org.apache.kafka.streams.state.AbstractTransaction
  • org.apache.kafka.streams.state.AbstractTransactionalStore
Code Block
languagejava
titleorg.apache.kafka.streams.state.AbstractTransaction
/**
 * Base implementation for transactions created by an implementation of {@link AbstractTransactionalStore}.
 * <p>
 * This base implementation provides the following functionality:
 * <ul>
 *     <li>Registration of callbacks that are invoked after this transaction has closed. Used by {@link
 *     AbstractTransactionalStore} to track open transactions that need to be closed when the parent store closes.</li>
 *     <li>Tracking/delegation of {@link #isOpen()}, {@link #persistent()}, {@link #name()} and {@link
 *     #isolationLevel()} methods.</li>
 *     <li>Provides a {@link #validateIsOpen()} method, useful for ensuring the transaction state in query methods.</li>
 *     <li>Provides base implementations of {@link #flush()} and {@link #close()} that properly handle idempotence and
 *     invoking any registered callbacks.</li>
 * </ul>
 * <p>Note: none of the above methods are marked {@code final}, to enable power-users to provide alternatives. However,
 * this should only be done with great care, and only if absolutely necessary.</p>
 * <p>
 * For resource-safety, AbstractTransactions implement the {@link AutoCloseable} interface, enabling try-with-resources:
 * <pre>
 *     try (final AbstractTransaction<MyStore> transaction = (AbstractTransaction<MyStore>) store.newTransaction()) {
 *         transaction.put("foo", "bar");
 *         transaction.flush();
 *     }
 * </pre>
 * If you are not using try-with-resources, you <em>must</em> call either {@link #flush()} exactly-once, or {@link
 * #close()} at least once, or risk possible resource leaks.
 *
 * @see AbstractTransactionalStore
 * @param <S> The type of the {@link StateStore} that spawned this transaction.
 */
public abstract class AbstractTransaction<S extends StateStore> implements StateStore, AutoCloseable {

    /**
     * Commit and close this Transaction.
     * 
     * @see StateStore#flush()
     */
    public abstract void commitTransaction();

    /**
	 * Closes this Transaction, without committing records.
     *
     * @see StateStore#close()
     */
    public abstract void closeTransaction();
}
Code Block
languagejava
titleorg.apache.kafka.streams.state.AbstractTransactionalStore
/**
 * Base class for transactional stores, that tracks open transactions and can close them all.
 * <p>
 * Transactions created using {@link #newTransaction()} will be automatically tracked and closed when this store is
 * closed.
 *
 * @see AbstractTransaction
 */
public abstract class AbstractTransactionalStore implements StateStore {

    /**
     * Creates a new transaction.
     * <p>
     * Implementations of this class should implement this method, instead of {@link #newTransaction()}, which
     * will call this method to produce the new transaction.
     * <p>
     * Transactions produced by this method must be an instance of {@link AbstractTransaction}, to enable the
     * automatic management of transaction resources.
     * 
     * @return A new {@link Transaction}.
     */
    public abstract AbstractTransaction<? extends AbstractTransactionalStore> createTransaction();
}

Proposed Changes

IsolationLevel.READ_COMMITTED : IsolationLevel.READ_UNCOMMITTED;
    }

Proposed Changes

To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit. To achieve this, introduce the concept of transaction Isolation Levels, that dictate the visibility of records written by different threads.To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit. To achieve this, we introduce the concept of a Transaction, which is a view of the underlying state store, and buffers writes until flush() is called. Transactions implement the StateStore interface, for compatibility, so they can be used anywhere an existing StateStore is expected.

Internally, we enable configuration of the level of isolation provided by StateStores via a context-wide IsolationLevel, which can be configured to either:

...

IsolationLevelDescriptionprocessing.mode
READ_UNCOMMITTEDRecords written by any thread are visible to all other threads immediately. This level provides no atomicity, consistency, isolation or durability guarantees.at-least-once
READ_COMMITTEDRecords written by one thread are only visible by other threads once they have been committed.exactly-once, exactly-once-beta, exactly-once-v2

In Kafka Streams, all StateStore s are written to by a single StreamThread  (this is the Single Writer principle). However, multiple other threads may concurrently read from StateStore s, principally to service Interactive Queries. In practice, this means that under READ_COMMITTED, writes by the StreamThread  that owns the StateStore  will only become visible to Interactive Query threads once flush()  has been called.

StateStores are only flushed StateStore transactions are only committed once the Kafka transaction for their changelog entries have been committed. This ensures that, under exactly-once, the records written to disk always reflect the state of the store's changelog.

The at-least-once processing mode does not need the isolation guarantees that transactions provide READ_COMMITTED provides, and indeed, doing so could cause problems as the default commit.interval.ms .ms under at-least-once is 30 seconds, which would require buffering records for up to 30 seconds, unnecessarily. For this reason, under at-least-once is 30 seconds, which would require buffering records for up to 30 seconds, unnecessarily. For this reason, under at-least-once, Streams will automatically configure all StateStores to use the READ_UNCOMMITTED isolation level, which provides no isolation guarantees, and ensures that records are immediately written to disk.

In-memory Transaction Buffers

Many StateStore implementations, including RocksDB, will buffer records written to a transaction entirely in-memory, which could cause issues, either with JVM heap or native memory. To mitigate this, we will automatically force a Task commit if the total memory used for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes() exceeds the threshold configured by statestore.uncommitted.max.bytes. This will roughly bound the memory required for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failure. Each StreamThread will be given 1/num.stream.threads of the configured limits, dividing it fairly between them.

It's possible that some Topologies can generate many more new StateStore entries than the records they process, in which case, it would be possible for such a Topology to cross the configured record/memory thresholds mid-processing, potentially causing an OOM error if these thresholds are exceeded by a lot. To mitigate this, the StreamThread will measure the increase in records/bytes written on each iteration, and pre-emptively commit if the next iteration is likely to cross a threshold.

Note that these new methods provide default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.

Transaction Management

A wrapper TransactionalKeyValueStore will be provided, which takes care of automatically creating a new transaction when it's initialized, and starting a new transaction after the existing one has been committed. This is the principle way that transactions will be managed by the Streams engine. A wrapper TransactionalSegment will also be provided, which extends TransactionalKeyValueStore, providing automatic transaction handling for Segments.

The AbstractTransaction and AbstractTransactionalStore classes detailed above provide a base implementation for transactionality that will automatically track and manage open transactions, to ensure that they are properly closed when their StateStore closes, ensuring no resource leaks. These are the base classes for RocksDBStore and RocksDBTransaction, respectively.

Interactive Queries

Interactive queries currently see every record, as soon as they are written to a StateStore. This can cause some consistency issues, as interactive queries can read records before they're committed to the Kafka changelog, which may be rolled-back. To address this, interactive queries will query the underlying StateStore, and will not be routed through a Transaction. This ensures that interactive queries see a consistent view of the store, as they will not be able to read any uncommitted records.

When operating under IsolationLevel.READ_COMMITTED (i.e. EOS), the maximum time for records to become visible to interactive queries will be commit.interval.ms, however, this is automatically set to a low value, and it's not recommended for users to use high intervals when operating under EOS.

When operating under IsolationLevel.READ_UNCOMMITTED, (i.e. ALOS), all records will be immediately visible to interactive queries, so the high default commit.interval.ms of 30s will have no impact on interactive query latency.

Error Handling

Kafka Streams currently generates a TaskCorruptedException when a Task needs to have its state wiped (under EOS) and be re-initialized. There are currently several different situations that generate this exception:

  1. No offsets for the store can be found when opening it under EOS.
  2. OutOfRangeException during restoration, usually caused by the changelog being wiped on application reset.
  3. TimeoutException under EOS, when writing to or committing a Kafka transaction.

The first two of these are extremely rare, and make sense to keep. However, timeouts are much more frequent. They currently require the store to be wiped under EOS because when a timeout occurs, the data in the local StateStore will have been written, but the data in the Kafka changelog will have failed to be written, causing a mismatch in consistency.

With Transactional StateStores, we can guarantee that the local state is consistent with the changelog, therefore, it will no longer be necessary to reset the local state on a TimeoutException.

RocksDB Transactions

When the isolation level is READ_COMMITTED, we will use RocksDB's WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. When reading records, we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query. The performance of this is expected to actually be better than the existing, non-batched write path. The main performance concern is that the buffer must reside completely in-memory until it is committed, which is addressed by statestore.uncommitted.max.bytes, see above.

Compatibility, Deprecation, and Migration Plan

The above changes will retain compatibility for all existing StateStores, including user-defined custom implementations. Any StateStore that extends RocksDBStore will automatically inherit its behaviour, although users that directly write via the db RocksDB  instance may need to switch to using the dbAccessor to ensure consistent results.

, Streams will automatically configure all StateStores to use the READ_UNCOMMITTED isolation level, which provides no isolation guarantees, and ensures that records are immediately written to disk.

In-memory Transaction Buffers

Many StateStore implementations, including RocksDB, will buffer records written to a transaction entirely in-memory, which could cause issues, either with JVM heap or native memory. To mitigate this, we will automatically force a Task commit if the total memory used for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes() exceeds the threshold configured by statestore.uncommitted.max.bytes. This will roughly bound the memory required for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failure. Each StreamThread will be given 1/num.stream.threads of the configured limits, dividing it fairly between them.

It's possible that some Topologies can generate many more new StateStore entries than the records they process, in which case, it would be possible for such a Topology to cross the configured record/memory thresholds mid-processing, potentially causing an OOM error if these thresholds are exceeded by a lot. To mitigate this, the StreamThread will measure the increase in records/bytes written on each iteration, and pre-emptively commit if the next iteration is likely to cross the threshold.

Note that this new method provide default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.

Interactive Queries

Interactive queries currently see every record, as soon as they are written to a StateStore. This can cause some consistency issues, as interactive queries can read records before they're committed to the Kafka changelog, which may be rolled-back. To address this, interactive queries will query the underlying StateStore, irrespective of the configured IsolationLevel. This ensures that interactive queries see a consistent view of the store, as they will not be able to read any uncommitted records. This is safe to do, because Interactive Queries have a read-only view of the StateStore, so it only needs to worry about avoiding reading writes that have not yet been flushed.

When operating under IsolationLevel.READ_COMMITTED (i.e. EOS), the maximum time for records to become visible to interactive queries will be commit.interval.ms, however, this is automatically set to a low value, and it's not recommended for users to use high intervals when operating under EOS.

When operating under IsolationLevel.READ_UNCOMMITTED, (i.e. ALOS), all records will be immediately visible to interactive queries, so the high default commit.interval.ms of 30s will have no impact on interactive query latency.

Error Handling

Kafka Streams currently generates a TaskCorruptedException when a Task needs to have its state wiped (under EOS) and be re-initialized. There are currently several different situations that generate this exception:

  1. No offsets for the store can be found when opening it under EOS.
  2. OutOfRangeException during restoration, usually caused by the changelog being wiped on application reset.
  3. TimeoutException under EOS, when writing to or committing a Kafka transaction.

The first two of these are extremely rare, and make sense to keep. However, timeouts are much more frequent. They currently require the store to be wiped under EOS because when a timeout occurs, the data in the local StateStore will have been written, but the data in the Kafka changelog will have failed to be written, causing a mismatch in consistency.

With Transactional StateStores, we can guarantee that the local state is consistent with the changelog, therefore, it will no longer be necessary to reset the local state on a TimeoutException.

RocksDB Transactions

When the isolation level is READ_COMMITTED, we will use RocksDB's WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. When reading records from the StreamThread, we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query. When reading records from Interactive Queries, we will use the regular RocksDB#get and RocksDB#newIterator methods, to ensure we see only records that have been flushed (see above). The performance of this is expected to actually be better than the existing, non-batched write path. The main performance concern is that the buffer must reside completely in-memory until it is committed, which is addressed by statestore.uncommitted.max.bytes, see above.

Compatibility, Deprecation, and Migration Plan

The above changes will retain compatibility for all existing StateStores, including user-defined custom implementations. Any StateStore that extends RocksDBStore will automatically inherit its behaviour, although users that directly write via the db RocksDB  instance may need to switch to using the dbAccessor to ensure consistent results.

All new methods on existing classes will have defaults set to ensure compatibility.

Presently, under EOS, Kafka Streams will only checkpoint StateStores when they close (e.g. due to Task migration or application shutdown). This means that StateStores are very rarely explicitly flushed to disk, leaving e.g. RocksDB to flush data to disk on its own schedule. Under EOS we will now need to regularly flush StateStore records to disk, to ensure the recorded checkpoints match the data on-disk. This may have a performance impact for users that expect infrequent StateStore flushes. To mitigate this, instead of checkpointing on every commit (which with the default EOS commit.interval.ms  of 100ms would be extremely frequent), we will checkpoint whenever it is likely that the unflushed data will exceed statestore.uncommitted.max.bytes before the next Task commit.

This should reduce the frequency of memtable flushes, reducing the performance impact for users.

A more complete solution would be to de-couple store flush from store checkpointing, however, that has been pushed back to a later KIP due to the significant amount of work required (see "Atomic Checkpointing")All new methods on existing classes will have defaults set to ensure compatibility.

Test Plan

Testing will be accomplished by both the existing tests and by writing some new unit tests that verify atomicity, durability and consistency guarantees that this KIP provides.

...

Kafka Streams currently stores the changelog offsets for a StateStore in a per-Task on-disk file, .checkpoint, which under EOS, is written only when Streams shuts down successfully. There are two major problems with this approach::

  • To ensure that the data on-disk matches the checkpoint offsets in the .checkpoint file, we must flush the StateStores whenever we update the offsets in .checkpoint. This is a performance regression, as it causes a significant increase in the frequency of RocksDB memtable flushes, which increases load on RocksDB's compaction threads.
  • There
  • If this file is only written on a clean exit, we won't know the offsets our StateStore contains, and therefore will need to wipe and restore on any error, nullifying many of the improvements gained by Transactional State Stores.
  • If we write to this file on every successful commit, there's a race condition, where it's possible the application exits after data has been committed to RocksDB, but before the checkpoint file has been updated, causing a consistency violation.

To resolve this, we move the responsibility for offset management to the StateStore itself. The new commit method takes a map of all the changelog offsets that correspond to the state of the Transaction being transaction buffer being committed.

RocksDBStore will store these offsets in a separate Column Family, and will be configured to atomically flush all its Column Families. This guarantees that the changelog offsets will always be flushed to disk together with the data they represent, irrespective of how that flush is triggered. This allows us to remove the explicit memtable flush(), enabling RocksDB to dictate when memtables are flushed to disk. This will likely improve performance and reduce compactions, as it will no longer be required to flush new SST files every 10,000 recordsto remove the explicit memtable flush(), enabling RocksDB to dictate when memtables are flushed to disk.

The existing .checkpoint files will be retained for any StateStore that does not set managesOffsets()  to true , and the existing offsets will be automatically migrated into StateStores that manage their own offsets, iff there is no offset returned by StateStore#getCommittedOffset.

Required interface changes:

  • Add methods void commit(Map<TopicPartition, Long> changelogOffsets), boolean managesOffsets() and Long getCommittedOffset(TopicPartition) to StateStore .
  • Deprecate method flush() on StateStore.

Offsets for Consumer Rebalances

...

The additional latency this would introduce to the rebalance is unacceptable. Instead, we will continue to write the offsets of all StateStores, including those that manage their own offsets, to the .checkpoint file. For StateStores that manage their own offsets, this file will only be read if that store is closed (i.e. not yet initialized). Since the store is not open, the above race condition does not apply, making this safe. If a StateStore returns true for both  managesOffsets()  and isOpen() , then the store will be queried for its offsets directly, via getCommittedOffset(TopicPartition).

This approach of Atomic Checkpointing, while beneficial, is out of scope, as it is not strictly necessary to make StateStores Transactional, and end unnecessary state restoration. It will likely be addressed in a follow-up KIP.

...