Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Revise to include Atomic Checkpointing and a section detailing changes for interactive queries.

...

This KIP explores an alternative design that should have little/no performance impact, potentially performing better than the status quo, and can thus be enabled for all stores.

...

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
    /**
     * Creates a new {@link Transaction} for reading/writing to this state store Flush any cached data
     * @deprecated Use {@link #commit(Map)} instead.
     * <p>/
    @Deprecated
 * State stores thatdefault dovoid not support transactions will return {@code this} instead, which should be considered a
 flush() {
        commit(Collections.emptyMap());
    }

    /**
 transaction   that doesn't* provideCommits any isolation or atomicity guaranteesuncommitted records to this StateStore with the given {@code changelogOffsets}.
     * <p>
     * If this StateStore has any open {@link Transaction Transactionstransactions}, they arewill <em>not thread-safe</em>, and should not be shared among threads. Newbe {@link Transaction#commit(Map)
     * committed}.
     * threads<p>
 should use this method to* createThe agiven new transaction, instead of sharing an existing one.
     * <p>{@code changelogOffsets} will be committed to the StateStore along with the uncommitted {@link
     * TransactionsTransaction cratedtransactions}. byIf this methodStateStore shouldsupports haveatomic thetransactions, sameoffsets {@linkare IsolationLevel}guaranteed as the {@link StateStoreContext}to be
     * thatcommitted thisatomically storewith wasthe {@link #init(StateStoreContext, StateStore) initialized with}records they correspond to.
     * <p>
     * To@param avoidchangelogOffsets resource leaks, it is recommended to extend {@link AbstractTransactionalStore}, which will track openThe input/changelog topic offsets of the records being committed.
     */
 transactions and automatically close@Evolving
 all open transactions whendefault thevoid store is closed.
  commit(final Map<TopicPartition, Long> changelogOffsets) {
   *
     *flush();
 @return A new {@link Transaction} to isolate reads/writes to this {@link StateStore}. The Transaction}

    /**
     * Returns whether this StateStore manages its own changelog offsets.
     * <p>
     * This can <strong>MUST</strong>only bereturn {@link Transaction#flush() committed@code true} orif {@link Transaction#close#persistent() closed}
 also returns {@code true}, *as non-persistent stores
     * have no changelog offsets to manage.
   when you are finished with it, to prevent resource leaks  *
     * @return Whether this StateStore manages its own changelog offsets.
     */
    @Evolving
    default StateStoreboolean newTransactionmanagesOffsets() {
        return thisfalse;
    }


    /**
     * ReturnReturns anthe approximatecommitted countoffset offor recordsthe not yet committed to this StateStoregiven partition.
     * <p>
     * ThisIf methodthis willstore returnis annot approximation of the number of records that would be committed by the next call to
     * {@link #commit(Map){@link #persistent()} or does not {@link #managesOffsets() manage its own offsets},
     * it will always yield {@code null}.
     * <p>
     * If this{@code StateStoretopicPartition} is unablea changelog topartition approximatelyor countTopology uncommittedinput records,partition itfor willthis returnStateStore, {@code -1}.this method
     * Ifwill thisreturn StateStorethe doescommitted notoffset supportfor atomic transactions, it will return {@code 0}, because records will always be
     * immediately written to a non-transactional store, so there will be none awaiting a {@link #commit(Map)}.that partition.
     * <p>
     * If no committed offset exists for the given partition, or if the partition is not a changelog or input partition
     *
     * @return The approximate number of records awaiting {@link #commit(Map)}, {@code -1} if the number of
     * for the store, {@code null} will be returned.
     *
     * @param topicPartition The changelog/input partition to get the committed offset for.
     * @return The committed uncommittedoffset recordsfor can'tthe begiven countedpartition, or {@code 0null} if thisno StateStorecommitted does not support transactionsoffset exists, or if this
     *         store does not contain committed offsets.
     */
    @Evolving
    default longLong approximateNumUncommittedEntriesgetCommittedOffset(final TopicPartition topicPartition) {
        return 0null;
    }

    /**
     * ReturnCreates ana approximatenew count{@link ofTransaction} memoryfor used by records not yet committed reading/writing to this StateStorestate store.
     * <p>
     * ThisState methodstores willthat returndo annot approximationsupport oftransactions thewill memoryreturn would{@code bethis} freedinstead, bywhich theshould nextbe call to {@link #commit(Map)}.considered a
     * <p>
transaction that doesn't provide any *isolation Ifor thisatomicity StateStoreguarantees.
 is unable to approximately count* uncommitted<p>
 memory usage, it will return* {@code -1}.
     * If this StateStore does not support atomic transactions, it will return {@code 0}, because records will always be
     * immediately written to a non-transactional store, so there will be none awaiting a {@link #commit(Map)}@link Transaction Transactions} are <em>not thread-safe</em>, and should not be shared among threads. New
     * threads should use this method to create a new transaction, instead of sharing an existing one.
     * <p>
     * Transactions @returncrated Theby approximatethis sizemethod ofshould allhave recordsthe awaitingsame {@link #commit(Map)IsolationLevel}, {@code -1} if as the size of uncommitted{@link StateStoreContext}
     * that this store was     records can't be counted, or {@code 0} if this StateStore does not support transactions.{@link #init(StateStoreContext, StateStore) initialized with}.
     * <p>
     */
    @Evolving
    default long approximateNumUncommittedBytes() { To avoid resource leaks, it is recommended to extend {@link AbstractTransactionalStore}, which will track open
     * transactions and automatically close all open returntransactions 0;
when the store is }
Code Block
languagejava
titleorg.apache.kafka.streams.processor.StateStoreContext
closed.
     /**
     * @return ReturnsA thenew {@link IsolationLevelTransaction} thatto every {@link Transaction} created by isolate reads/writes to this {@link StateStore#newTransaction()StateStore}
.     * should use.The Transaction
     * <p>
     * The default implementation<strong>MUST</strong> ofbe this method will use{@link Transaction#commit(Map) committed} or {@link IsolationLevel#READ_COMMITTED READ_COMMITTED} if theTransaction#close() closed}
     * app is {@link #appConfigs() configured} to use an {@link StreamsConfig#EXACTLY_ONCE_V2 exactly-once} {@link when you are finished with it, to prevent resource leaks.
     * StreamsConfig#PROCESSING_GUARANTEE_CONFIG processing guarantee}. Otherwise, it will be {@link
/
    @Evolving
    default StateStore newTransaction() {
       * IsolationLevel#READ_UNCOMMITTED READ_UNCOMMITTED}. return this;
    }

     /**
     * @returnReturn Thean isolationapproximate levelcount forof everyrecords transactionnot createdyet by state stores forcommitted to this contextStateStore.
     */ <p>
    default IsolationLevel isolationLevel() {
        return StreamsConfigUtils.eosEnabled(new StreamsConfig(appConfigs())) ?
      * This method will return an approximation of the number of records that would be committed by the next call to
     * {@link #commit(Map)}.
    IsolationLevel.READ_COMMITTED : IsolationLevel.READ_UNCOMMITTED;* <p>
    }

New Interfaces

  • org.apache.kafka.streams.state.Transaction
  • org.apache.kafka.streams.state.AbstractTransactionalStore
Code Block
languagejava
titleorg.apache.kafka.streams.state.Transaction
/**
 * RepresentsIf athis read/writeStateStore transactionis onunable ato stateapproximately store.
count *uncommitted <p>
records, *it For compatibility, transactions implement the entire {@link StateStore} interface, however, they only represent a
 * <em>view</em> on an underlying {@link StateStore}; therefore methods that explicitly act on the entire store, such
 * as {@link #init} will do nothing, and others like {@link #flush()} and {@link #close()} will act upon this
 * transaction only.
 * <p>
 * Transactions are <em>NOT</em> thread-safe, and they should not be shared among multiple threads. Threads should
 * create their own {@link StateStore#newTransaction() new transaction}, which will ensure proper isolation of
 * concurrent changes.
 * <p>
 * For resource-safety, Transactions implement the {@link AutoCloseable} interface, enabling try-with-resources:
 * <pre>
 *     try (final Transaction transaction = store.newTransaction()) {
 *will return {@code -1}.
     * If this StateStore does not support atomic transactions, it will return {@code 0}, because records will always be
     * immediately written to a non-transactional store, so there will be none awaiting a {@link #commit(Map)}.
     *
     * @return The approximate number of records awaiting {@link #commit(Map)}, {@code -1} if the number of
     *         uncommitted records can't be counted, or {@code 0} if this StateStore does not support transactions.
     */
    @Evolving
    default long approximateNumUncommittedEntries() {
          transaction.put("foo", "bar")return 0;
 *   }

   /**
   transaction.flush();
  * Return an approximate count }
of * </pre>
 * If you are not using try-with-resources, you <em>must</em> call either {@link #flush()} exactly-once, or {@link
 * #close()} at least once, or risk possible resource leaks.
 */
public interface Transaction extends StateStore, AutoCloseable {

    /**memory used by records not yet committed to this StateStore.
     * <p>
     * This method will return an approximation of the memory would be freed by the next call to {@link #commit(Map)}.
     * <p>
     * TheIf {@linkthis IsolationLevel}StateStore thatis reads and writes in this transaction are subject tounable to approximately count uncommitted memory usage, it will return {@code -1}.
     * <p>
If this StateStore does not *support Inatomic thetransactions, contextit ofwill areturn {@link org.apache.kafka.streams.processor.StateStore} transaction, these Isolation Levels@code 0}, because records will always be
     * adhereimmediately written to thea <a href="https://en.wikipedia.org/wiki/Isolation_(database_systems)#Read_phenomena">ANSI SQL 92
     * definitions</a>non-transactional store, so there will be none awaiting a {@link #commit(Map)}.
     * <p>
     * All@return isolationThe levelsapproximate guarantee "read-your-own-writes", i.e. that writes in the transaction will be seen by
     * subsequent reads <em>from within the same transaction</em>. Other guarantees vary by isolation level:
     * <p>
     * <table>size of all records awaiting {@link #commit(Map)}, {@code -1} if the size of uncommitted
     *         records can't be counted, or {@code 0} if this StateStore does not support transactions.
     */
     <tr>@Evolving
    default *long approximateNumUncommittedBytes() {
       <th>Isolation Level</th> return 0;
     *   }


Code Block
languagejava
titleorg.apache.kafka.streams.processor.StateStoreContext
       <th>Description</th>/**
     * Returns the {@link IsolationLevel} that every {@link Transaction} created <th>Permittedby Read Phenomena</th>{@link StateStore#newTransaction()}
     *     </tr>should use.
     *     <tr><p>
     * The default implementation of this method will use <td>{@link IsolationLevel#READ_UNCOMMITTED}</td> // todo: ALOSCOMMITTED READ_COMMITTED} if the
     * app is {@link #appConfigs() configured} to use an <td>Allows queries to read writes from all ongoing transactions that have not-yet been committed.</td>
     *{@link StreamsConfig#EXACTLY_ONCE_V2 exactly-once} {@link
     * StreamsConfig#PROCESSING_GUARANTEE_CONFIG processing guarantee}. Otherwise, it will be {@link
     *    <td>dirty reads, non-repeatable reads, phantom reads</td>IsolationLevel#READ_UNCOMMITTED READ_UNCOMMITTED}.
     *
     * @return The isolation level </tr>
for every transaction created by *state stores for this  <tr>context.
     */
    default IsolationLevel isolationLevel() {
  <td>{@link IsolationLevel#READ_COMMITTED}</td> // todo: EOS
  return StreamsConfigUtils.eosEnabled(new  *StreamsConfig(appConfigs())) ?
         <td>Allows queries to only read writes that IsolationLevel.READ_COMMITTED : IsolationLevel.READ_UNCOMMITTED;
    }

New Interfaces

  • org.apache.kafka.streams.state.Transaction
  • org.apache.kafka.streams.state.AbstractTransactionalStore


Code Block
languagejava
titleorg.apache.kafka.streams.state.Transaction
/**
 * Represents a read/write transaction on a state store.
 * <p>
 * For compatibility, transactions implement the entire {@link StateStore} interface, however, they only represent a
 * <em>view</em> on an underlying {@link StateStore}; therefore methods that explicitly act on the entire store, such
 * as {@link #init} will do nothing, and others like {@link #commit()} and {@link #close()} will act upon this
 * transaction only.
 * <p>
 * Transactions are <em>NOT</em> thread-safe, and they should not be shared among multiple threads. Threads should
 * create their own {@link StateStore#newTransaction() new transaction}, which will ensure proper isolation of
 * concurrent changes.
 * <p>
 * For resource-safety, Transactions implement the {@link AutoCloseable} interface, enabling try-with-resources:
 * <pre>
 *     try (final Transaction transaction = store.newTransaction()) {
 *         transaction.put("foo", "bar");
 *         transaction.commit(Map);
 *     }
 * </pre>
 * If you are not using try-with-resources, you <em>must</em> call either {@link #commit(Map)} exactly-once, or {@link
 * #close()} at least once, or risk possible resource leaks.
 */
public interface Transaction extends StateStore, AutoCloseable {

    /**
     * The {@link IsolationLevel} that reads and writes in this transaction are subject tohave been committed to the StateStore. Writes by an ongoing
     *         transaction are not visible <em>until that transaction commits</em>.</td>
     *         <td>non-repeatable reads, phantom reads</td>
     *     </tr>
     * </table>
     * <p>
     * Under {@link IsolationLevel#READ_UNCOMMITTED}, there are no guarantees on when records from other transactions
	 * become visible, therefore implementations <em>may</em> refrain from making uncommitted records visible to other
     * transactions until they're committed, if doing so would improve performance.
     */
    IsolationLevel isolationLevel();

    /**
     * Initializes this Transaction.
     * <p>
     * Most transactions require no explicit initialization, therefore the default implementation of this method does
     * nothing.
     *
     * @deprecated Since 2.7.0, the parent method has been deprecated in favor of {@link
     *             #init(StateStoreContext, StateStore)}. Use that method instead.
     */
    @Override @Deprecated <p>
    default void* init(finalIn ProcessorContextthe context, final StateStore root)of a {
    }

    /**@link org.apache.kafka.streams.processor.StateStore} transaction, these Isolation Levels
     * Initializesadhere thisto Transaction.
the     * <p><a href="https://en.wikipedia.org/wiki/Isolation_(database_systems)#Read_phenomena">ANSI SQL 92
     * Mostdefinitions</a>.
 transactions require no explicit initialization,* therefore<p>
 the default implementation of this* methodAll does
isolation levels guarantee   * nothing.
     */
    @Override"read-your-own-writes", i.e. that writes in the transaction will be seen by
    default void init(final StateStoreContext context, final StateStore root) {
    }

 * subsequent reads <em>from within the same transaction</em>. Other guarantees vary by isolation level:
     /** <p>
     * <table>
 Is  this transaction is* open for reading and writing.<tr>
     * @return {@code true} if this Transaction can be read<th>Isolation fromLevel</writtenth>
 to, or {@code false} if*   it is no longer
   <th>Description</th>
  *   *      accessible.
   <th>Permitted Read *Phenomena</th>
    @Override
 *   boolean isOpen();

    /** </tr>
     * Commit and close this Transaction.<tr>
     * <p>
     * Any records held by this Transaction will be written to the underlying state store.
<td>{@link IsolationLevel#READ_UNCOMMITTED}</td> // todo: ALOS
     *       * <p>
 <td>Allows queries to read *writes Oncefrom thisall methodongoing returnstransactions successfully,that thishave Transactionnot-yet will no longer be available to use for reads/writes.been committed.</td>
     *
     * @see #close() to close<td>dirty the Transaction without writing data to the underlying state store.
reads, non-repeatable reads, phantom reads</td>
     *     *</tr>
    @Override
 *   void flush();

  <tr>
     /**
       * Closes this Transaction, without committing records.<td>{@link IsolationLevel#READ_COMMITTED}</td> // todo: EOS
     * <p>
     * Any uncommitted records will <em>not</em> be written<td>Allows queries to theonly underlyingread statewrites store,that andhave willbeen insteadcommitted be
to the StateStore. Writes by *an discarded.ongoing
     * <p>
     * This method should be used to "rollback" a Transaction that should not be {@link #flush() committed}.
     * <p>  transaction are not visible <em>until that transaction commits</em>.</td>
     *         <td>non-repeatable reads, phantom reads</td>
     * The underlying {@link StateStore} will <em>not</em> be closed by this method. </tr>
     * </table>
     * <p>
     * ThisUnder method is idempotent: repeatedly calling {@code close()} will not produce an error.
     *
     * @see #flush() to close this Transaction by writing its records to the underlying state store{@link IsolationLevel#READ_UNCOMMITTED}, there are no guarantees on when records from other transactions
	 * become visible, therefore implementations <em>may</em> refrain from making uncommitted records visible to other
     * transactions until they're committed, if doing so would improve performance.
     */
    @OverrideIsolationLevel isolationLevel();

    void close();
}

New Base Implementations

  • org.apache.kafka.streams.state.AbstractTransaction
  • org.apache.kafka.streams.state.AbstractTransactionalStore
Code Block
languagejava
titleorg.apache.kafka.streams.state.AbstractTransaction
/**
 *   Base implementation* ofInitializes {@linkthis Transaction.
 transactions} created by an implementation of* {@link<p>
     * AbstractTransactionalStore}.
 * <p>
 * This base implementation provides the following functionality:
 * <ul>
 * Most transactions require no explicit initialization, therefore the default implementation of this method does
     * nothing.
     *
     <li>Registration* of@deprecated callbacks that are invoked after this Transaction has closed. Used bySince 2.7.0, the parent method has been deprecated in favor of {@link
 *    *  AbstractTransactionalStore}   to track open transactions that need to be closed when the parent store closes.</li>
 *     <li>Tracking/delegation of {@link #isOpen()}, {@link #persistent()}, {@link #name()} and {@link
 *     #isolationLevel()} methods.</li>
 *     <li>Provides a {@link #validateIsOpen()} method, useful for ensuring the Transaction state in query methods.</li>
 *     <li>Provides base implementations of {@link #flush()} and {@link #close()} that properly handle idempotence and
 *     invoking any registered callbacks.</li>
 * </ul>
 * <p>Note: none of the above methods are marked {@code final}, to enable power-users to provide alternatives. However,
 * this should only be done with great care, and only if absolutely necessary.</p>

 *
 * @param <S> The type of the {@link StateStore} that spawned this transaction.
 */
public abstract class AbstractTransaction<S extends StateStore> implements Transaction {

    /**
     * Commit and close this Transaction#init(StateStoreContext, StateStore)}. Use that method instead.
     */
    @Override @Deprecated
    default void init(final ProcessorContext context, final StateStore root) {
    }

    /**
     * Initializes this Transaction.
     * <p>
     * Most transactions require no explicit initialization, therefore the default implementation of this method does
     * nothing.
     */
    @Override
    default void init(final StateStoreContext context, final StateStore root) {
    }

    /**
     * Is this transaction is open for reading and writing.
     * @return {@code true} if this Transaction can be read from/written to, or {@code false} if it is no longer
     *         accessible.
     */
 
   @Override
  * @see Transaction#flush()
     */
    public abstract void commitTransactionboolean isOpen();

    /**
	     * Closes this Transaction, without committing recordsCreates a new {@link Transaction} from an existing transaction.
     * <p>
     * @see Transaction#close()
     */
    public abstract void closeTransaction();
}
Code Block
languagejava
titleorg.apache.kafka.streams.state.AbstractTransactionalStore
/**
 * Base class for transactional stores, that tracks open transactions and can close them all.
This enables potentially re-using resources from an existing, no longer in-use transaction, instead of creating
     * a new one.
     * <p>
 * Transactions created using {@link #newTransaction()} will be automatically tracked and closed when this store is
 * closed.
 *
 * @see Transaction
 * @see AbstractTransaction
 */
public abstract class AbstractTransactionalStore implements StateStore {

    /*** This method should only be called if the current transaction is guaranteed to no longer be in-use.
     * Implementations may return either a new Transaction instance, or a reference to themselves, only if they are able
     * Creates a new {@link Transaction}to reset to being available for use.
     * <p>
     * Implementations{@link ofTransaction this class should implement this method, instead of {@link #newTransaction()}, whichTransactions} are <em>not thread-safe</em>, and should not be shared among threads. New
     * willthreads should calluse this method to producecreate thea new transaction, instead of sharing an existing one.
     * <p>
     * Transactions producedcreated by this method mustwill behave anthe instance ofsame {@link AbstractTransactionIsolationLevel}, as tothe enable{@link theStateStoreContext}
     * automaticthat management of transaction resourcesthis transaction was {@link #init(StateStoreContext, StateStore) initialized with}.
     * 
     * @return A new {@link Transaction} to control reads/writes to the same {@link StateStore}. The Transaction
     */
         <em>MUST</em> be public abstract AbstractTransaction<? extends AbstractTransactionalStore> createTransaction();
}

Proposed Changes

To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit. To achieve this, we introduce the concept of a Transaction, which is a view of the underlying state store, and buffers writes until flush() is called. Transactions implement the StateStore interface, for compatibility, so they can be used anywhere an existing StateStore is expected.

Internally, we enable configuration of the level of isolation provided by StateStores via a context-wide IsolationLevel, which can be configured to either:

...

{@link Transaction#commit(Map) committed} or {@link Transaction#close() closed} when you
     *         are finished with it, to prevent resource leaks.
     */
    @Override
    StateStore newTransaction();

    /**
     * Commit and close this Transaction.
     * <p>
     * Any records held by this Transaction will be written to the underlying state store.
     * <p>
     * Once this method returns successfully, this Transaction will no longer be available to use for reads/writes.
     *
     * @see #close() to close the Transaction without writing data to the underlying state store.
     */
    @Override
    void commit(final Map<TopicPartition, Long> changelogOffsets);

    /**
     * Closes this Transaction, without committing records.
     * <p>
     * Any uncommitted records will <em>not</em> be written to the underlying state store, and will instead be
     * discarded.
     * <p>
     * This method should be used to "rollback" a Transaction that should not be {@link #commit(Map) committed}.
     * <p>
     * The underlying {@link StateStore} will <em>not</em> be closed by this method.
     * <p>
     * This method is idempotent: repeatedly calling {@code close()} will not produce an error.
     *
     * @see #commit(Map) to close this Transaction by writing its records to the underlying state store.
     */
    @Override
    void close();
}

New Base Implementations

  • org.apache.kafka.streams.state.AbstractTransaction
  • org.apache.kafka.streams.state.AbstractTransactionalStore


Code Block
languagejava
titleorg.apache.kafka.streams.state.AbstractTransaction
/**
 * Base implementation of {@link Transaction transactions} created by an implementation of {@link
 * AbstractTransactionalStore}.
 * <p>
 * This base implementation provides the following functionality:
 * <ul>
 *     <li>Registration of callbacks that are invoked after this Transaction has closed. Used by {@link
 *     AbstractTransactionalStore} to track open transactions that need to be closed when the parent store closes.</li>
 *     <li>Tracking/delegation of {@link #isOpen()}, {@link #persistent()}, {@link #name()} and {@link
 *     #isolationLevel()} methods.</li>
 *     <li>Provides a {@link #validateIsOpen()} method, useful for ensuring the Transaction state in query methods.</li>
 *     <li>Provides base implementations of {@link #commit(Map)} and {@link #close()} that properly handle idempotence and
 *     invoking any registered callbacks.</li>
 * </ul>
 * <p>Note: none of the above methods are marked {@code final}, to enable power-users to provide alternatives. However,
 * this should only be done with great care, and only if absolutely necessary.</p>

 *
 * @param <S> The type of the {@link StateStore} that spawned this transaction.
 */
public abstract class AbstractTransaction<S extends StateStore> implements Transaction {

    /**
     * Commit and close this Transaction.
     * 
     * @see Transaction#commit(Map)
     */
    public abstract void commitTransaction(final Map<TopicPartition, Long> offsets);

    /**
	 * Closes this Transaction, without committing records.
     *
     * @see Transaction#close()
     */
    public abstract void closeTransaction();
}


Code Block
languagejava
titleorg.apache.kafka.streams.state.AbstractTransactionalStore
/**
 * Base class for transactional stores, that tracks open transactions and can close them all.
 * <p>
 * Transactions created using {@link #newTransaction()} will be automatically tracked and closed when this store is
 * closed.
 *
 * @see Transaction
 * @see AbstractTransaction
 */
public abstract class AbstractTransactionalStore implements StateStore {

    /**
     * Creates a new {@link Transaction}.
     * <p>
     * Implementations of this class should implement this method, instead of {@link #newTransaction()}, which
     * will call this method to produce the new transaction.
     * <p>
     * Transactions produced by this method must be an instance of {@link AbstractTransaction}, to enable the
     * automatic management of transaction resources.
     * 
     * @return A new {@link Transaction}.
     */
    public abstract AbstractTransaction<? extends AbstractTransactionalStore> createTransaction();
}

Proposed Changes

To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit. To achieve this, we introduce the concept of a Transaction, which is a view of the underlying state store, and buffers writes until commit(Map) is called. Transactions implement the StateStore interface, for compatibility, so they can be used anywhere an existing StateStore is expected.

Internally, we enable configuration of the level of isolation provided by StateStores via a context-wide IsolationLevel, which can be configured to either:

IsolationLevelDescriptionprocessing.mode
READ_UNCOMMITTEDRecords written to any transaction are visible to all other transactions immediately. This level provides no atomicity, consistency, isolation or durability guarantees.at-least-once
READ_COMMITTEDRecords written to one transaction are only visible by other transactions once they have been committed.exactly-once, exactly-once-beta, exactly-once-v2

StateStore Transactions are only committed once the Kafka transaction for their changelog entries have been committed. This ensures that, under exactly-once, the records written to disk always reflect the state of the store's changelog.

The at-least-once processing mode does not need the isolation guarantees that transactions provide, and indeed, doing so could cause problems as the default commit.interval.ms under at-least-once is 30 seconds, which would require buffering records for up to 30 seconds, unnecessarily. For this reason, under at-least-once, Streams will automatically configure all StateStores to use the READ_UNCOMMITTED isolation level, which provides no isolation guarantees, and ensures that records are immediately written to disk.

RocksDB Transactions

When the isolation level is READ_COMMITTED, we will use RocksDB's WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. When reading records, we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query. The performance of this is expected to actually be better than the existing, non-batched write path. The main performance concern is that the buffer must reside completely in-memory until it is committed.

To mitigate this, we will automatically force a Task commit if the total uncommitted records returned by StateStore#approximateNumUncommittedEntries()  exceeds a threshold, configured by statestore.uncommitted.max.records; or the total memory used for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes() exceeds the threshold configured by statestore.uncommitted.max.bytes. This will roughly bound the memory required per-Thread for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failure.

Note that these new methods provide default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.

Transaction Management

A wrapper TransactionalKeyValueStore will be provided, which takes care of automatically creating a new transaction when it's initialized, and starting a new transaction after the existing one has been committed. This is the principle way that transactions will be managed by the Streams engine. A wrapper TransactionalSegment will also be provided, which extends TransactionalKeyValueStore, providing automatic transaction handling for Segments.

The AbstractTransaction and AbstractTransactionalStore classes detailed above provide a base implementation for transactionality that will automatically track and manage open transactions, to ensure that they are properly closed when their StateStore closes, ensuring no resource leaks. These are the base classes for RocksDBStore and RocksDBTransaction, respectively.

Atomic Checkpointing

Kafka Streams currently stores the changelog offsets for a StateStore in a per-Task on-disk file, .checkpoint, which under EOS, is written only when Streams shuts down successfully. There are two major problems with this approach:

  • If this file is only written on a clean exit, we won't know the offsets our StateStore contains, and therefore will need to wipe and restore on any error, nullifying many of the improvements gained by Transactional State Stores.
  • If we write to this file on every successful commit, there's a race condition, where it's possible the application exits after data has been committed to RocksDB, but before the checkpoint file has been updated, causing a consistency violation.

To resolve this, we move the responsibility for offset management to the StateStore itself. The new commit method takes a map of all the changelog offsets that correspond to the state of the Transaction being committed.

RocksDBStore will store these offsets in a separate Column Family, and will be configured to atomically flush all its Column Families. This guarantees that the changelog offsets will always be flushed to disk together with the data they represent, irrespective of how that flush is triggered. This allows us to remove the explicit memtable flush(), enabling RocksDB to dictate when memtables are flushed to disk. This will likely improve performance and reduce compactions, as it will no longer be required to flush new SST files every 10,000 records.

The existing .checkpoint files will be retained for any StateStore that does not set managesOffsets()  to true , and the existing offsets will be automatically migrated into StateStores that manage their own offsets, iff there is no offset returned by StateStore#getCommittedOffset.

Interactive Queries

Interactive queries currently see every record, as soon as they are written to a StateStore. This can cause some consistency issues, as interactive queries can read records before they're committed to the Kafka changelog, which may be rolled-back. To address this, interactive queries will query the underlying StateStore, and will not be routed through a Transaction. This ensures that interactive queries see a consistent view of the store, as they will not be able to read any uncommitted records.

When operating under IsolationLevel.READ_COMMITTED (i.e. EOS), the maximum time for records to become visible to interactive queries will be commit.interval.ms, however, this is automatically set to a low value, and it's not recommended for users to use high intervals when operating under EOS.

When operating under IsolationLevel.READ_UNCOMMITTED, (i.e. ALOS), all records will be immediately visible to interactive queries, so the high default commit.interval.ms of 30s will have no impact on interactive query latency.

StateStore Transactions are only committed once the Kafka transaction for their changelog entries have been committed. This ensures that, under exactly-once, the records written to disk always reflect the state of the store's changelog.

The at-least-once processing mode does not need the isolation guarantees that transactions provide, and indeed, doing so could cause problems as the default commit.interval.ms under at-least-once is 30 seconds, which would add a latency of up to 30 seconds for records to become visible to Interactive Queries. For this reason, under at-least-once, Streams will automatically configure all StateStores to use the READ_UNCOMMITTED isolation level, which provides no isolation guarantees, and ensures that records are immediately available for query.

RocksDB Transactions

When the isolation level is READ_COMMITTED, we will use RocksDB's WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. When reading records, we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query. The performance of this is expected to actually be better than the existing, non-batched write path. The main performance concern is that the buffer must reside completely in-memory until it is committed.

To mitigate this, we will automatically force a Task commit if the total uncommitted records returned by StateStore#approximateNumUncommittedEntries()  exceeds a threshold, configured by statestore.uncommitted.max.records; or the total memory used for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes() exceeds the threshold configured by statestore.uncommitted.max.bytes. This will roughly bound the memory required per-Task for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failure.

Note that these new methods provide default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.

Transaction Management

A wrapper TransactionalKeyValueStore will be provided, which takes care of automatically creating a new transaction when it's initialized, and starting a new transaction after the existing one has been flushed. This is the principle way that transactions will be managed by the Streams engine. A wrapper TransactionalSegment will also be provided, which extends TransactionalKeyValueStore, providing automatic transaction handling for Segments.

Interactive queries will be modified to automatically call newTransaction() and use that in-place of the StateStore being queried.

The AbstractTransaction and AbstractTransactionalStore classes detailed above provide a base implementation for transactionality that will automatically track and manage open transactions, to ensure that they are properly closed when their StateStore closes, ensuring no resource leaks. These are the base classes for RocksDBStore and RocksDBTransaction, respectively.

Compatibility, Deprecation, and Migration Plan

The above changes will retain compatibility for all existing StateStores, including user-defined custom implementations. Any StateStore that extends RocksDBStore will automatically inherit its behaviour, although users that directly write via the db RocksDB  instance may need to switch to using the dbAccessor to ensure consistent results.

All new methods on existing classes will have defaults set to ensure compatibility.

As noted above, any store for which managesOffsets() returns true, will have its existing offset(s) migrated from the existing .checkpoint file, when the store is first opened.

Test Plan

Testing will be accomplished by both the existing tests and by writing some new unit tests that verify atomicity, durability and consistency guarantees that this KIP provides.

...