Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add section on updated metrics.

...

Code Block
languagejava
titleorg.apache.kafka.streams.processor.StateStoreContext
    /**
     * The {@link IsolationLevel} that every transaction created by {@link StateStore#newTransaction()} should use.
     * <p>
     * In the context of a {@link org.apache.kafka.streams.processor.StateStore} transaction, these Isolation Levels
     * adhere to the <a href="https://en.wikipedia.org/wiki/Isolation_(database_systems)#Read_phenomena">ANSI SQL 92
     * definitions</a>.
     * <p>
     * All isolation levels guarantee "read-your-own-writes", i.e. that writes in the transaction will be seen by
     * subsequent reads <em>from within the same transaction</em>. Other guarantees vary by isolation level:
     * <p>
     * <table>
     *     <tr>
     *         <th>Isolation Level</th>
     *         <th>Description</th>
     *         <th>Permitted Read Phenomena</th>
     *     </tr>
     *     <tr>
     *         <td>{@link IsolationLevel#READ_UNCOMMITTED}</td> // todo: ALOS
     *         <td>Allows queries to read writes from all ongoing transactions that have not-yet been committed.</td>
     *         <td>dirty reads, non-repeatable reads, phantom reads</td>
     *     </tr>
     *     <tr>
     *         <td>{@link IsolationLevel#READ_COMMITTED}</td> // todo: EOS
     *         <td>Allows queries to only read writes that have been committed to the StateStore. Writes by an ongoing
     *         transaction are not visible <em>until that transaction commits</em>.</td>
     *         <td>non-repeatable reads, phantom reads</td>
     *     </tr>
     * </table>
     * <p>
     * Under {@link IsolationLevel#READ_UNCOMMITTED}, there are no guarantees on when records from other transactions
     * become visible, therefore implementations <em>may</em> refrain from making uncommitted records visible to other
     * transactions until they're committed, if doing so would improve performance.
     * <p>
     * The default implementation of this method will use {@link IsolationLevel#READ_COMMITTED READ_COMMITTED} if the
     * app is {@link #appConfigs() configured} to use an {@link StreamsConfig#EXACTLY_ONCE_V2 exactly-once} {@link
     * StreamsConfig#PROCESSING_GUARANTEE_CONFIG processing guarantee}. Otherwise, it will be {@link
     * IsolationLevel#READ_UNCOMMITTED READ_UNCOMMITTED}. 
     */
    default IsolationLevel isolationLevel() {
        return StreamsConfigUtils.eosEnabled(new StreamsConfig(appConfigs())) ?
                IsolationLevel.READ_COMMITTED : IsolationLevel.READ_UNCOMMITTED;
    }

Changed Metrics

  • stream-state-metrics 
    • flush-rate is renamed to commit-rate 
    • flush-latency-avg  is renamed to commit-latency-avg 
    • flush-latency-max is renamed to commit-latency-max 

These changes are necessary to ensure these metrics are not confused with orthogonal operations, like RocksDB memtable flushes or cache flushes. They will be measuring the invocation of StateStore#commit, which replaces StateStore#flush.

Proposed Changes

To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit. To achieve this, introduce the concept of transaction Isolation Levels, that dictate the visibility of records written by different threads.

...