Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Move IsolationLevel to a global config and revise Proposed Changes

...

New configuration

NameDefaultDescription
default.state.isolation.levelREAD_UNCOMMITTED

The default isolation level for Interactive Queries against StateStores. Supported values are READ_UNCOMMITTED and READ_COMMITTED.

statestore.uncommitted.max.bytes67108864 (64 MB)

Maximum number of memory bytes to be used to buffer uncommitted state-store records. If this limit is exceeded, a task commit will be requested. No limit: -1.

Note: if this is too high or unbounded, it's possible for RocksDB to trigger out-of-memory errors.

...

code
Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
   
    /**
     * Flush any cached data
     * 
     * @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()}
     *             instead.
     */
    @Deprecated
    default void flush() {
        // no-op
    }

    /**
     * Commit all written records to this StateStore.
     * <p>
     * This method MUST NOT be called by users from {@link org.apache.kafka.streams.processor.api.Processor processors},
     * as doing so may violate the consistency guarantees provided by this store, and expected by Kafka Streams.
     * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() 
     * ProcessorContext#commit()} to request a Task commit.
     * <p>
     * When called, every write written since the last call to {@link #commit(Map)}, or since this store was {@link
     * #init(StateStoreContext, StateStore) opened} will be made available to readers using the {@link
     * org.apache.kafka.common.IsolationLevel#READ_COMMITTED READ_COMMITTED} {@link
     * org.apache.kafka.common.IsolationLevel IsolationLevel}.
     * <p>
     * If {@link #persistent()} returns {@code true}, after this method returns, all records written since the last call
     * to {@link #commit(Map)} are guaranteed to be persisted to disk, and available to read, even if this {@link
     * StateStore} is {@link #close() closed} and subsequently {@link #init(StateStoreContext, StateStore) re-opened}.
     * <p>
     * If {@link #managesOffsets()} <em>also</em> returns {@code true}, the given {@code changelogOffsets} will be
     * guaranteed to be persisted to disk along with the written records.
     * <p>
     * {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However,
     * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided
     * partitions <em>MUST</em> be persisted to disk.
     * <p>
     * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the
     * records they represent.
     * 
     * @param changelogOffsets The changelog offset(s) corresponding to the most recently written records.
     */
    default void commit(final Map<TopicPartition, Long> changelogOffsets) {
        flush();
    }

    /**
     * Returns the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}.
     * <p>
     * If {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the
     * offset that corresponds to the changelog record most recently written to this store, for the given {@code
     * partition}.
     * <p>
     * This method provides readers using the {@link org.apache.kafka.common.IsolationLevel#READ_COMMITTED} {@link
     * org.apache.kafka.common.IsolationLevel} a means to determine the point in the changelog that this StateStore
     * currently represents.
     * 
     * @param partition The partition to get the committed offset for.
     * @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
     *         has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
     *         return {@code false}.
     */
    default Long getCommittedOffset(final TopicPartition partition) {
        return null;
    }

    /**
     * Determines if this StateStore manages its own offsets.
     * <p>
     * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using
     * {@link #getCommittedOffset(TopicPartition)}, even if the store is {@link #close() closed} and later re-opened.
     * <p>
     * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link
     * #getCommittedOffset(TopicPartition)} will be expected to always return {@code null}.
     * <p>
     * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highly
     * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka Streams
     * expects when operating under the {@code exactly-once} {@code processing.mode}.
     * 
     * @return Whether this StateStore manages its own offsets.
     */
    default boolean managesOffsets() {
        return false;
    }

   /**      
     * Return an approximate count of memory used by records not yet committed to this StateStore.
     * <p>
     * This method will return an approximation of the memory that would be freed by the next call to {@link
     * #commit(Map)}.
     * <p>
     * If no records have been written to this store since {@link #init(StateStoreContext, StateStore) opening}, or
     * since the last {@link #commit(Map)}; or if this store does not support atomic transactions, it will return {@code
     * 0}, as no records are currently being buffered.
     *
     * @return The approximate size of all records awaiting {@link #commit(Map)}; or {@code 0} if this store does not
     *         support transactions, or has not been written to since {@link #init(StateStoreContext, StateStore)} or
     *         last {@link #commit(Map)}.
     */
    @Evolving
    default long approximateNumUncommittedBytes() {
        return 0;
    }


Metrics

...

languagejava
titleorg.apache.kafka.streams.processor.StateStoreContext

...

    /**
     * The {@link IsolationLevel} that every transaction created by {@link StateStore#newTransaction()} should use.
     * <p>
     * In the context of a {@link org.apache.kafka.streams.processor.StateStore} transaction, these Isolation Levels
     * adhere to the <a href="https://en.wikipedia.org/wiki/Isolation_(database_systems)#Read_phenomena">ANSI SQL 92
     * definitions</a>.
     * <p>
     * All isolation levels guarantee "read-your-own-writes", i.e. that writes in the transaction will be seen by
     * subsequent reads <em>from within the same transaction</em>. Other guarantees vary by isolation level:
     * <p>
     * <table>
     *     <tr>
     *         <th>Isolation Level</th>
     *         <th>Description</th>
     *         <th>Permitted Read Phenomena</th>
     *     </tr>
     *     <tr>
     *         <td>{@link IsolationLevel#READ_UNCOMMITTED}</td> // todo: ALOS
     *         <td>Allows queries to read writes from all ongoing transactions that have not-yet been committed.</td>
     *         <td>dirty reads, non-repeatable reads, phantom reads</td>
     *     </tr>
     *     <tr>
     *         <td>{@link IsolationLevel#READ_COMMITTED}</td> // todo: EOS
     *         <td>Allows queries to only read writes that have been committed to the StateStore. Writes by an ongoing
     *         transaction are not visible <em>until that transaction commits</em>.</td>
     *         <td>non-repeatable reads, phantom reads</td>
     *     </tr>
     * </table>
     * <p>
     * Under {@link IsolationLevel#READ_UNCOMMITTED}, there are no guarantees on when records from other transactions
     * become visible, therefore implementations <em>may</em> refrain from making uncommitted records visible to other
     * transactions until they're committed, if doing so would improve performance.
     * <p>
     * The default implementation of this method will use {@link IsolationLevel#READ_COMMITTED READ_COMMITTED} if the
     * app is {@link #appConfigs() configured} to use an {@link StreamsConfig#EXACTLY_ONCE_V2 exactly-once} {@link
     * StreamsConfig#PROCESSING_GUARANTEE_CONFIG processing guarantee}. Otherwise, it will be {@link
     * IsolationLevel#READ_UNCOMMITTED READ_UNCOMMITTED}. 
     */
    default IsolationLevel isolationLevel() {
        return StreamsConfigUtils.eosEnabled(new StreamsConfig(appConfigs())) ?
                IsolationLevel.READ_COMMITTED : IsolationLevel.READ_UNCOMMITTED;
    }

New

  • stream-state-metrics 
    • commit-rate - the number of calls to StateStore#commit(Map)
    • commit-latency-avg - the average time taken to

Metrics

New

  • stream-state-metrics 
    • commit-rate - the number of calls to StateStore#commit(Map)
    • commit-latency-avg - the average time taken to call StateStore#commit(Map)
    • commit-latency-max - the maximum time taken to call StateStore#commit(Map)

...

To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit. To achieve this, introduce the concept of transaction Isolation Levels, that dictate the visibility of records written by different threads.

Internally, we We enable configuration of the level of isolation provided by StateStores via a context-wide IsolationLevel default.state.isolation.level, which can be configured to either:

IsolationLevel
default.state.isolation.levelDescriptionprocessing.
mode
modes
READ_UNCOMMITTEDRecords written by
any thread
the StreamThread are visible to all
other
Interactive Query threads immediately. This level provides no atomicity, consistency, isolation or durability guarantees.at-least-once
READ_COMMITTEDRecords written by
one thread
the StreamThread are only visible
by other
to Interactive Query threads once they have been committed.at-least-once, exactly-once, exactly-once-
beta
v2, exactly-once-
v2
beta

In Kafka Streams, all StateStore s are written to by a single StreamThread  (this is the Single Writer principle). However, multiple other threads may concurrently read from StateStore s, principally to service Interactive Queries. In practice, this means that under READ_COMMITTED, writes by the StreamThread  that owns the StateStore  will only become visible to Interactive Query threads once flush()  has been called.

StateStores are only flushed once the Kafka transaction for their changelog entries have been committed. This ensures that, under exactly-once, the records written to disk always reflect the state of the store's changelog.

become visible to Interactive Query threads once commit()  has been called.

The READ_UNCOMMITTED isolation level will only be available under the The at-least-onceprocessing mode does not need the isolation guarantees that READ_COMMITTED provides, and indeed, doing so could cause problems as the default commit.interval.ms .mode. If READ_UNCOMMITTED is selected with an EOS processing.mode, it will be automatically upgraded to READ_COMMITTED and a warning will be produced. This is due to the complexity of making uncommitted writes in the RocksDB transaction buffer available to other threads, and this restriction is expected to be removed in a later KIP. We are able to provide READ_UNCOMMITTED isolation under at-least-once is 30 seconds, which would require buffering records for up to 30 seconds, unnecessarily. For this reason, because under this mode, transactions are not used for writes to the changelog, therefore it is acceptable to write directly to the database, even if those writes fail to append to the changelog. This is the same behaviour that we have today, where stores are not wiped on error under at-least-once, Streams will automatically configure all StateStores to use the READ_UNCOMMITTED isolation level, which provides no isolation guarantees, and ensures that records are immediately written to disk.

The default value for default.state.isolation.level will be READ_UNCOMMITTED, to mirror the behaviour we have today; but this will be automatically set to READ_COMMITTED if the processing.mode has been set to an EOS mode (see above).

In-memory Transaction Buffers

...

Interactive queries currently see every record, as soon as they are written to a StateStore. This can cause some consistency issues, as interactive queries can read records before they're committed to the Kafka changelog, which may be rolled-back. To address this, interactive queries will query the underlying StateStore, irrespective of the configured IsolationLevel. This ensures that interactive queries see a consistent view of the store, as they will not be able to read any uncommitted records. This is safe to do, because Interactive Queries have a read-only view of the StateStore, so it only needs to worry about avoiding reading writes that have not yet been flushedrecords before they're committed to the Kafka changelog, which may be rolled-back. To address this, we have introduced configurable isolation levels, configured globally via default.state.isolation.level (see above).

When operating under IsolationLevel.the READ_COMMITTED (i.e. EOS) isolation level, the maximum time for records to become visible to interactive queries will be commit.interval.ms. Under EOS, however, this is automatically set to by default a low value , and it's not recommended for users to use high intervals when operating under EOS(100 ms), but under at-least-once, the default is 30 seconds. Users may need to adjust their commit.interval.ms to meet the visibility latency goals for their use-case.

When operating under IsolationLevel.the READ_UNCOMMITTED isolation level, (i.e. ALOS), all records will be immediately visible to interactive queries, so the high default commit.interval.ms of 30s will have no impact on interactive query latency.

...

When an interactive query is made under the READ_COMMITTED IsolationLevel isolation level the PositionBound will constrain the committed Position map, whereas under READ_UNCOMMITTED, the PositionBound will constrain the uncommitted Position map.

...