Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Clarified changes necessary for Segment stores, and fixed a few minor typos.

...

Code Block
languagejava
titleorg.apache.kafka.streams.state.Transaction
/**
 * Represents a read/write transaction on a state store.
 * <p>
 * For compatibility, transactions implement the entire {@link StateStore} interface, however, they only represent a
 * <em>view</em> on an underlying {@link StateStore}; therefore methods that explicitly act on the entire store, such
 * as {@link #init} will do nothing, and others like {@link #flush()} and {@link #close()} will act upon this
 * transaction only.
 * <p>
 * Transactions are <em>NOT</em> thread-safe, and they should not be shared among multiple threads. Threads should
 * create their own {@link StateStore#newTransaction() new transaction}, which will ensure proper isolation of
 * concurrent changes.
 * <p>
 * For resource-safety, Transactions implement the {@link AutoCloseable} interface, enabling try-with-resources:
 * <pre>
 *     try (final Transaction transaction = store.newTransaction()) {
 *         transaction.put("foo", "bar");
 *         transaction.flush();
 *     }
 * </pre>
 * If you are not using try-with-resources, you <em>must</em> call either {@link #flush()} exactly-once, or {@link
 * #close()} at least once, or risk possible resource leaks.
 */
public interface Transaction extends StateStore, AutoCloseable {

    /**
     * The {@link IsolationLevel} that reads and writes in this transaction are subject to.
     * <p>
     * In the context of a {@link org.apache.kafka.streams.processor.StateStore} transaction, these Isolation Levels
     * adhere to the <a href="https://en.wikipedia.org/wiki/Isolation_(database_systems)#Read_phenomena">ANSI SQL 92
     * definitions</a>.
     * <p>
     * All isolation levels guarantee "read-your-own-writes", i.e. that writes in the transaction will be seen by
     * subsequent reads <em>from within the same transaction</em>. Other guarantees vary by isolation level:
     * <p>
     * <table>
     *     <tr>
     *         <th>Isolation Level</th>
     *         <th>Description</th>
     *         <th>Permitted Read Phenomena</th>
     *     </tr>
     *     <tr>
     *         <td>{@link IsolationLevel#READ_UNCOMMITTED}</td> // todo: ALOS
     *         <td>Allows queries to read writes from all ongoing transactions that have not-yet been committed.</td>
     *         <td>dirty reads, non-repeatable reads, phantom reads</td>
     *     </tr>
     *     <tr>
     *         <td>{@link IsolationLevel#READ_COMMITTED}</td> // todo: EOS
     *         <td>Allows queries to only read writes that have been committed to the StateStore. Writes by an ongoing
     *         transaction are not visible <em>until that transaction commits</em>.</td>
     *         <td>non-repeatable reads, phantom reads</td>
     *     </tr>
     * </table>
     * <p>
     * Under {@link IsolationLevel#READ_UNCOMMITTED}, there are no guarantees on when records from other transactions
	 * become visible, therefore implementations <em>may</em> refrain from making uncommitted records visible to other
     * transactions until they're committed, if doing so would improve performance.
     */
    IsolationLevel isolationLevel();

    /**
     * Initializes this Transaction.
     * <p>
     * Most transactions require no explicit initialization, therefore the default implementation of this method does
     * nothing.
     *
     * @deprecated Since 2.7.0, the parent method has been deprecated in favor of {@link
     *             #init(StateStoreContext, StateStore)}. Use that method instead.
     */
    @Override @Deprecated
    default void init(final ProcessorContext context, final StateStore root) {
        init((StateStoreContext) context, root);
    }

    /**
     * Initializes this Transaction.
     * <p>
     * Most transactions require no explicit initialization, therefore the default implementation of this method does
     * nothing.
     */
    @Override
    default void init(final StateStoreContext context, final StateStore root) {
        // do nothing?
    }

    /**
     * Is this transaction is open for reading and writing.
     * @return {@code true} if this Transaction can be read from/written to, or {@code false} if it is no longer
     *         accessible.
     */
    @Override
    boolean isOpen();

    /**
     * Commit and close this Transaction.
     * <p>
     * Any records held by this Transaction will be written to the underlying state store.
     * <p>
     * Once this method returns successfully, this Transaction will no longer be available to use for reads/writes.
     *
     * @see #close() to close the Transaction without writing data to the underlying state store.
     */
    @Override
    void flush();

    /**
     * Closes this Transaction, without committing records.
     * <p>
     * Any uncommitted records will <em>not</em> be written to the underlying state store, and will instead be
     * discarded.
     * <p>
     * This method should be used to "rollback" a Transaction that should not be {@link #flush() committed}.
     * <p>
     * The underlying {@link StateStore} will <em>not</em> be closed by this method.
     * <p>
     * This method is idempotent: repeatedly calling {@code close()} will not produce an error.
     *
     * @see #flush() to close this Transaction by writing its records to the underlying state store.
     */
    @Override
    void close();
}

...

Code Block
languagejava
titleorg.apache.kafka.streams.state.AbstractTransaction
/**
 * Base implementation of {@link Transaction transactions} created by an implementation of {@link
 * AbstractTransactionalStore}.
 * <p>
 * This base implementation provides the following functionality:
 * <ul>
 *     <li>Registration of callbacks that are invoked after this Transaction has closed. Used by {@link
 *     AbstractTransactionalStore} to track open transactions that need to be closed when the parent store closes.</li>
 *     <li>Tracking/delegation of {@link #isOpen()}, {@link #persistent()}, {@link #name()} and {@link
 *     #isolationLevel()} methods.</li>
 *     <li>Provides a {@link #validateIsOpen()} method, useful for ensuring the Transaction state in query methods.</li>
 *     <li>Provides base implementations of {@link #flush()} and {@link #close()} that properly handle idempotence and
 *     invoking any registered callbacks.</li>
 * </ul>
 * <p>Note: none of the above methods are marked {@code final}, to enable power-users to provide alternatives. However,
 * this should only be done with great care, and only if absolutely necessary.</p>

 *
 * @param <S> The type of the {@link StateStore} that spawned this transaction.
 */
public abstract class AbstractTransaction<S extends StateStore> implements Transaction {

    /**
     * Commit and close this Transaction.
     * 
     * @see Transaction#flush()
     */
    public abstract void commitTransaction();

    /**
	 * Closes this Transaction, without committedcommitting records.
     *
     * @see Transaction#close()
     */
    public abstract void closeTransaction();
}

...

A wrapper TransactionalKeyValueStore will be provided, which takes care of automatically creating a new transaction when it's initialized, and starting a new transaction after the existing one has been flushed. This is the principle way that transactions will be managed by the Streams engine. A similar analogue wrapper TransactionalSegment will be created to automatically manage Segment transactionsalso be provided, which extends TransactionalKeyValueStore, providing automatic transaction handling for Segments.

Interactive queries will be modified to automatically call newTransaction() and use that in-place of the StateStore being queried.

The AbstractTransaction and AbstractTransactionalStore classes detailed above provide a base implementation for transactionality that will automatically track and manage open transactions, to ensure that they are properly closed when their StateStore closes, ensuring no resource leaks. These are the base classes for RocksDBStore and RocksDBTransaction, respectively.

...