Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Change status to Adopted

Table of Contents

Status

Current state: Under DiscussionAdopted

Discussion thread: Thread

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-14412

...

As described in KIP-844, under EOS, crash failures cause all Task state to be wiped out on restart. This is because, currently, data is written to the StateStore before the commit to its changelog has completed, so it's possible that records are written to disk that were not committed to the store changelog.

In KIP-844, it was proposed to create an alternative type of StateStore, which would enable users to opt-in to "transactional" behaviour, that ensured data was only persisted once the changelog commit has succeeded. However, the design and approach outlined in KIP-844 unfortunately did not perform well when tested (with a write throughput that was approximately only 4% of the regular RocksDB StateStore!).

This KIP explores an alternative design that should have little/no performance impact, potentially performing better than the status quo, and can thus be enabled for all stores.

Public Interfaces

New configuration

...

Maximum number of memory bytes to be used to buffer uncommitted state-store records. If this limit is exceeded, a task commit will be requested. No limit: -1.

Note: if this is too high or unbounded, it's possible for RocksDB to trigger out-of-memory errors.

This ensures consistency of local stores with their changelog topics, but can cause long delays in processing while it rebuilds the local state from the changelog. These delays are proprotional to the number of records in the changelog topic, which for highly active tables, or those with a very high cardinality, can be very large. Real-world use-cases have been observed where these delays can span multiple days, where both processing, and interactive queries, are paused.

In KIP-844, it was proposed to create an alternative type of StateStore, which would enable users to opt-in to "transactional" behaviour, that ensured data was only persisted once the changelog commit has succeeded. However, the design and approach outlined in KIP-844 unfortunately did not perform well when tested (with a write throughput that was approximately only 4% of the regular RocksDB StateStore!).

This KIP explores an alternative design that should have little/no performance impact, potentially performing better than the status quo, and can thus be enabled for all stores. This should bound state restore under EOS to less than 1 second, irrespective of the size of the changelogs.

Public Interfaces

New configuration

NameDefaultDescription
default.state.isolation.levelREAD_UNCOMMITTED

The default isolation level for Interactive Queries against StateStores. Supported values are READ_UNCOMMITTED and READ_COMMITTED.

statestore.uncommitted.max.bytes67108864 (64 MB)

Maximum number of memory bytes to be used to buffer uncommitted state-store records. If this limit is exceeded, a task commit will be requested. No limit: -1.

Note: if this is too high or unbounded, it's possible for RocksDB to trigger out-of-memory errors.

Changed Interfaces

  • org.apache.kafka.streams.processor.StateStore
  • org.apache.kafka.streams.processor.StateStoreContext

Changes:

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.

Changed Interfaces

  • org.apache.kafka.streams.processor.StateStore
  • org.apache.kafka.streams.processor.StateStoreContext

Changes:

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
   
    /**
     * Flush any cached data
     * 
     * @deprecated Use {@link #commit(Map)} org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()}
     *             instead.
       */
    @Deprecated
    default void flush() {
         commit(Collections.emptyMap());// no-op
    }

    /**
     * CommitsCommit anyall uncommittedwritten records to this StateStore with the given {@code changelogOffsets}.
     * <p>
     * IfThis thismethod StateStoreMUST hasNOT any openbe called by users from {@link Transaction transactionsorg.apache.kafka.streams.processor.api.Processor processors},
 they will be {@link Transaction#commit(Map)
     * committed}.
     * <p>
     * The given {@code changelogOffsets} will be committed to the StateStore along with the uncommitted {@link * as doing so may violate the consistency guarantees provided by this store, and expected by Kafka Streams.
     * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() 
     * Transaction transactions}. If this StateStore supports atomic transactions, offsets are guaranteed to beProcessorContext#commit()} to request a Task commit.
     * <p>
     * committedWhen atomicallycalled, withevery thewrite recordswritten theysince correspondthe to.
last call to {@link  *
     * @param changelogOffsets The input/changelog topic offsets of the records being committed.
     */
    @Evolving#commit(Map)}, or since this store was {@link
     * #init(StateStoreContext, StateStore) opened} will be made available to readers using the {@link
    default void commit(final Map<TopicPartition, Long> changelogOffsets) {* org.apache.kafka.common.IsolationLevel#READ_COMMITTED READ_COMMITTED} {@link
     *   flush();
    }

org.apache.kafka.common.IsolationLevel IsolationLevel}.
     * <p>
     /**
 If {@link #persistent()} returns * Returns whether{@code true}, after this StateStoremethod managesreturns, itsall ownrecords changelogwritten offsets.
since the last   * <p>call
     * This can only return {@code true} if {@link #persistent(to {@link #commit(Map)} alsoare returnsguaranteed {@code true}, as non-persistent stores
     * have no changelog offsets to manage.to be persisted to disk, and available to read, even if this {@link
     *
 StateStore} is {@link #close() closed} *and @returnsubsequently Whether this{@link #init(StateStoreContext, StateStore manages its own changelog offsets) re-opened}.
     */ <p>
    @Evolving
 * If  default boolean managesOffsets{@link #managesOffsets()} {
<em>also</em> returns {@code true}, the given {@code changelogOffsets} returnwill false;be
    }

    /**
 guaranteed to be persisted *to Returnsdisk the committed offset foralong with the givenwritten partitionrecords.
     * <p>
     * If{@code thischangelogOffsets} storewill isusually notcontain {@link #persistent()} or does not {@link #managesOffsets() manage its own offsets}a single partition, in the case of a regular StateStore. However,
     * itthey may willcontain alwaysmultiple yieldpartitions {@code null}.
     * <p>
     * If {@code topicPartition} is a changelog partition or Topology input partition for this StateStore, this methodin the case of a Global StateStore with multiple partitions. All provided
     * partitions <em>MUST</em> be persisted to disk.
     * <p>
     * willImplementations return<em>SHOULD</em> theensure committedthat offset{@code forchangelogOffsets} thatare partition.
committed to disk atomically with * <p>the
     * If no committed offset exists for the given partition, or if the partition is not a changelog or input partition
     * for the store, {@code null} will be returnedrecords they represent.
     * 
     * @param changelogOffsets The changelog offset(s) corresponding to the most recently written records.
     */
    default * @param topicPartition The changelog/input partition to get the committed offset for.void commit(final Map<TopicPartition, Long> changelogOffsets) {
        flush();
    }

    /**
  @return The committed offset* forReturns the givenmost partition,recently or {@code null} if no committed@link #commit(Map) committed} offset exists, or if thisfor the given {@link TopicPartition}.
     * <p>
     * If  store does not contain committed offsets.
     */{@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the
    @Evolving
 * offset that defaultcorresponds Longto getCommittedOffset(finalthe TopicPartitionchangelog topicPartition)record {
most recently written to this store, for the returngiven null;{@code
     * partition}.

     /** <p>
     * CreatesThis amethod new provides readers using the {@link Transactionorg.apache.kafka.common.IsolationLevel#READ_COMMITTED} for reading/writing to this state store.
     * <p>
     * State stores that do not support transactions will return {@code this} instead, which should be considered a
     * transaction that doesn't provide any isolation or atomicity guarantees{@link
     * org.apache.kafka.common.IsolationLevel} a means to determine the point in the changelog that this StateStore
     * currently represents.
     * 
     * @param partition The partition to get the committed offset for.
     * <p>
@return The last   * {@link Transaction Transactions#commit(Map) committed} areoffset <em>not thread-safe</em>, and should not be shared among threads. Newfor the {@code partition}; or {@code null} if no offset
     * threads should use this method to create a newhas transaction,been insteadcommitted offor sharingthe anpartition, existingor one.
if either {@link #persistent()} or * <p>{@link #managesOffsets()}
     * Transactions crated by this method should have the samereturn {@link@code IsolationLevel} as the {@link StateStoreContext}false}.
     */
 that this store wasdefault {@linkLong #init(StateStoreContext, StateStore) initialized with}.committedOffset(final TopicPartition partition) {
     * <p>
     * To avoid resource leaks, it is recommended to extend {@link AbstractTransactionalStore}, which will track openreturn null;
    }

    /**
     * transactionsDetermines andif automaticallythis closeStateStore allmanages openits transactions when the store is closedown offsets.
     * <p>
     * @returnIf this Amethod newreturns {@link@code Transactiontrue}, tothen isolateoffsets reads/writesprovided to this {@link StateStore}. The Transaction#commit(Map)} will be retrievable using
     * {@link #committedOffset(TopicPartition)}, even if the store is  <strong>MUST</strong> be {@link Transaction#commit#close(Map) committedclosed} orand {@link Transaction#close() closed}later re-opened.
     * <p>
     * If this whenmethod youreturns are finished with it, to prevent resource leaks.
     */{@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link
    @Evolving
    default StateStore newTransaction() {
        return this;
    }

* #committedOffset(TopicPartition)} will be expected to always return {@code null}.
     /** <p>
     * ReturnThis method anis approximateprovided countto ofenable recordscustom notStateStores yetto committedopt-in to this StateStore.
     * <p>
     * This method will return an approximation of the number of records that would be committed by the next call to
     * {@link #commit(Map)managing their own offsets. This is highly
     * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka Streams
     * expects when operating under the {@code exactly-once} {@code processing.mode}.
     * <p>
     * @return IfWhether this StateStore ismanages unableits toown approximatelyoffsets.
 count uncommitted records, it will return {@code -1}. */
    default boolean managesOffsets() {
     * If this StateStorereturn doesfalse;
 not support atomic transactions,}

 it will return/** {@code 0}, because records will always be
     * immediatelyReturn writtenan toapproximate acount non-transactional store, so there will be none awaiting a {@link #commit(Map)}of memory used by records not yet committed to this StateStore.
     * <p>
     * This method @returnwill Thereturn approximatean numberapproximation of recordsthe awaitingmemory {@link #commit(Map)}, {@code -1} if the number ofthat would be freed by the next call to {@link
     * #commit(Map)}.
     * <p>
   uncommitted  records* can'tIf beno counted,records orhave {@codebeen 0}written ifto this StateStore does not support transactions.store since {@link #init(StateStoreContext, StateStore) opening}, or
     */
    @Evolving
    default long approximateNumUncommittedEntries() { since the last {@link #commit(Map)}; or if this store does not support atomic transactions, it will return {@code
     * 0}, as returnno 0;
records are currently being }buffered.

     /**
     * Return@return anThe approximate countsize of memory used byall records notawaiting yet committed to this StateStore.
     * <p>{@link #commit(Map)}; or {@code 0} if this store does not
     * This method will return an approximation of the memory wouldsupport betransactions, freedor byhas thenot nextbeen callwritten to since {@link #commit(Map#init(StateStoreContext, StateStore)}. or
     * <p>
     * If this StateStorelast is unable to approximately count uncommitted memory usage, it will return {@code -1{@link #commit(Map)}.
       */
 If this StateStore does@Evolving
 not support atomic transactions,default it will returnlong approximateNumUncommittedBytes() {@code
 0}, because records will always be
  return 0;
  * immediately written to a non-transactional store, so there will be none awaiting a {@link #commit(Map)}.
     *
     * @return The approximate size of all records awaiting {@link #commit(Map)}, {@code -1} if the size of uncommitted
     *         records can't be counted, or {@code 0} if this StateStore does not support transactions.
     */
    @Evolving
    default long approximateNumUncommittedBytes() {
        return 0;
    }
Code Block
languagejava
titleorg.apache.kafka.streams.processor.StateStoreContext
    /**
     * Returns the {@link IsolationLevel} that every {@link Transaction} created by {@link StateStore#newTransaction()}
     * should use.
     * <p>
     * The default implementation of this method will use {@link IsolationLevel#READ_COMMITTED READ_COMMITTED} if the
     * app is {@link #appConfigs() configured} to use an {@link StreamsConfig#EXACTLY_ONCE_V2 exactly-once} {@link
     * StreamsConfig#PROCESSING_GUARANTEE_CONFIG processing guarantee}. Otherwise, it will be {@link
     * IsolationLevel#READ_UNCOMMITTED READ_UNCOMMITTED}.
     *
     * @return The isolation level for every transaction created by state stores for this context.
     */
    default IsolationLevel isolationLevel() {
        return StreamsConfigUtils.eosEnabled(new StreamsConfig(appConfigs())) ?
                IsolationLevel.READ_COMMITTED : IsolationLevel.READ_UNCOMMITTED;
    }

New Interfaces

  • org.apache.kafka.streams.state.Transaction
  • org.apache.kafka.streams.state.AbstractTransactionalStore
Code Block
languagejava
titleorg.apache.kafka.streams.state.Transaction
/**
 * Represents a read/write transaction on a state store.
 * <p>
 * For compatibility, transactions implement the entire {@link StateStore} interface, however, they only represent a
 * <em>view</em> on an underlying {@link StateStore}; therefore methods that explicitly act on the entire store, such
 * as {@link #init} will do nothing, and others like {@link #commit()} and {@link #close()} will act upon this
 * transaction only.
 * <p>
 * Transactions are <em>NOT</em> thread-safe, and they should not be shared among multiple threads. Threads should
 * create their own {@link StateStore#newTransaction() new transaction}, which will ensure proper isolation of
 * concurrent changes.
 * <p>
 * For resource-safety, Transactions implement the {@link AutoCloseable} interface, enabling try-with-resources:
 * <pre>
 *     try (final Transaction transaction = store.newTransaction()) {
 *         transaction.put("foo", "bar");
 *         transaction.commit(Map);
 *     }
 * </pre>
 * If you are not using try-with-resources, you <em>must</em> call either {@link #commit(Map)} exactly-once, or {@link
 * #close()} at least once, or risk possible resource leaks.
 */
public interface Transaction extends StateStore, AutoCloseable {

    /**
     * The {@link IsolationLevel} that reads and writes in this transaction are subject to.
     * <p>
     * In the context of a {@link org.apache.kafka.streams.processor.StateStore} transaction, these Isolation Levels
     * adhere to the <a href="https://en.wikipedia.org/wiki/Isolation_(database_systems)#Read_phenomena">ANSI SQL 92
     * definitions</a>.
     * <p>
     * All isolation levels guarantee "read-your-own-writes", i.e. that writes in the transaction will be seen by
     * subsequent reads <em>from within the same transaction</em>. Other guarantees vary by isolation level:
     * <p>
     * <table>
     *     <tr>
     *         <th>Isolation Level</th>
     *         <th>Description</th>
     *         <th>Permitted Read Phenomena</th>
     *     </tr>
     *     <tr>
     *         <td>{@link IsolationLevel#READ_UNCOMMITTED}</td> // todo: ALOS
     *         <td>Allows queries to read writes from all ongoing transactions that have not-yet been committed.</td>
     *         <td>dirty reads, non-repeatable reads, phantom reads</td>
     *     </tr>
     *     <tr>
     *         <td>{@link IsolationLevel#READ_COMMITTED}</td> // todo: EOS
     *         <td>Allows queries to only read writes that have been committed to the StateStore. Writes by an ongoing
     *         transaction are not visible <em>until that transaction commits</em>.</td>
     *         <td>non-repeatable reads, phantom reads</td>
     *     </tr>
     * </table>
     * <p>
     * Under {@link IsolationLevel#READ_UNCOMMITTED}, there are no guarantees on when records from other transactions
	 * become visible, therefore implementations <em>may</em> refrain from making uncommitted records visible to other
     * transactions until they're committed, if doing so would improve performance.
     */
    IsolationLevel isolationLevel();

    /**
     * Initializes this Transaction.
     * <p>
     * Most transactions require no explicit initialization, therefore the default implementation of this method does
     * nothing.
     *
     * @deprecated Since 2.7.0, the parent method has been deprecated in favor of {@link
     *             #init(StateStoreContext, StateStore)}. Use that method instead.
     */
    @Override @Deprecated
    default void init(final ProcessorContext context, final StateStore root) {
    }

    /**
     * Initializes this Transaction.
     * <p>
     * Most transactions require no explicit initialization, therefore the default implementation of this method does
     * nothing.
     */
    @Override
    default void init(final StateStoreContext context, final StateStore root) {
    }

    /**
     * Is this transaction is open for reading and writing.
     * @return {@code true} if this Transaction can be read from/written to, or {@code false} if it is no longer
     *         accessible.
     */
    @Override
    boolean isOpen();

    /**
     * Creates a new {@link Transaction} from an existing transaction.
     * <p>
     * This enables potentially re-using resources from an existing, no longer in-use transaction, instead of creating
     * a new one.
     * <p>
     * This method should only be called if the current transaction is guaranteed to no longer be in-use.
     * Implementations may return either a new Transaction instance, or a reference to themselves, only if they are able
     * to reset to being available for use.
     * <p>
     * {@link Transaction Transactions} are <em>not thread-safe</em>, and should not be shared among threads. New
     * threads should use this method to create a new transaction, instead of sharing an existing one.
     * <p>
     * Transactions created by this method will have the same {@link IsolationLevel} as the {@link StateStoreContext}
     * that this transaction was {@link #init(StateStoreContext, StateStore) initialized with}.
     *
     * @return A new {@link Transaction} to control reads/writes to the same {@link StateStore}. The Transaction
     *         <em>MUST</em> be {@link Transaction#commit(Map) committed} or {@link Transaction#close() closed} when you
     *         are finished with it, to prevent resource leaks.
     */
    @Override
    StateStore newTransaction();

    /**
     * Commit and close this Transaction.
     * <p>
     * Any records held by this Transaction will be written to the underlying state store.
     * <p>
     * Once this method returns successfully, this Transaction will no longer be available to use for reads/writes.
     *
     * @see #close() to close the Transaction without writing data to the underlying state store.
     */
    @Override
    void commit(final Map<TopicPartition, Long> changelogOffsets);

    /**
     * Closes this Transaction, without committing records.
     * <p>
     * Any uncommitted records will <em>not</em> be written to the underlying state store, and will instead be
     * discarded.
     * <p>
     * This method should be used to "rollback" a Transaction that should not be {@link #commit(Map) committed}.
     * <p>
     * The underlying {@link StateStore} will <em>not</em> be closed by this method.
     * <p>
     * This method is idempotent: repeatedly calling {@code close()} will not produce an error.
     *
     * @see #commit(Map) to close this Transaction by writing its records to the underlying state store.
     */
    @Override
    void close();
}

New Base Implementations

  • org.apache.kafka.streams.state.AbstractTransaction
  • org.apache.kafka.streams.state.AbstractTransactionalStore
Code Block
languagejava
titleorg.apache.kafka.streams.state.AbstractTransaction
/**
 * Base implementation of {@link Transaction transactions} created by an implementation of {@link
 * AbstractTransactionalStore}.
 * <p>
 * This base implementation provides the following functionality:
 * <ul>
 *     <li>Registration of callbacks that are invoked after this Transaction has closed. Used by {@link
 *     AbstractTransactionalStore} to track open transactions that need to be closed when the parent store closes.</li>
 *     <li>Tracking/delegation of {@link #isOpen()}, {@link #persistent()}, {@link #name()} and {@link
 *     #isolationLevel()} methods.</li>
 *     <li>Provides a {@link #validateIsOpen()} method, useful for ensuring the Transaction state in query methods.</li>
 *     <li>Provides base implementations of {@link #commit(Map)} and {@link #close()} that properly handle idempotence and
 *     invoking any registered callbacks.</li>
 * </ul>
 * <p>Note: none of the above methods are marked {@code final}, to enable power-users to provide alternatives. However,
 * this should only be done with great care, and only if absolutely necessary.</p>

 *
 * @param <S> The type of the {@link StateStore} that spawned this transaction.
 */
public abstract class AbstractTransaction<S extends StateStore> implements Transaction {

    /**
     * Commit and close this Transaction.
     * 
     * @see Transaction#commit(Map)
     */
    public abstract void commitTransaction(final Map<TopicPartition, Long> offsets);

    /**
	 * Closes this Transaction, without committing records.
     *
     * @see Transaction#close()
     */
    public abstract void closeTransaction();
}
Code Block
languagejava
titleorg.apache.kafka.streams.state.AbstractTransactionalStore
/**
 * Base class for transactional stores, that tracks open transactions and can close them all.
 * <p>
 * Transactions created using {@link #newTransaction()} will be automatically tracked and closed when this store is
 * closed.
 *
 * @see Transaction
 * @see AbstractTransaction
 */
public abstract class AbstractTransactionalStore implements StateStore {

    /**
     * Creates a new {@link Transaction}.
     * <p>
     * Implementations of this class should implement this method, instead of {@link #newTransaction()}, which
     * will call this method to produce the new transaction.
     * <p>
     * Transactions produced by this method must be an instance of {@link AbstractTransaction}, to enable the
     * automatic management of transaction resources.
     * 
     * @return A new {@link Transaction}.
     */
    public abstract AbstractTransaction<? extends AbstractTransactionalStore> createTransaction();
}

Proposed Changes

To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit. To achieve this, we introduce the concept of a Transaction, which is a view of the underlying state store, and buffers writes until commit(Map) is called. Transactions implement the StateStore interface, for compatibility, so they can be used anywhere an existing StateStore is expected.

Internally, we enable configuration of the level of isolation provided by StateStores via a context-wide IsolationLevel, which can be configured to either:

...

 }


Metrics

New

  • stream-state-metrics 
    • commit-rate - the number of calls to StateStore#commit(Map)
    • commit-latency-avg - the average time taken to call StateStore#commit(Map)
    • commit-latency-max - the maximum time taken to call StateStore#commit(Map)

Deprecated

  • stream-state-metrics 
    • flush-rate
    • flush-latency-avg 
    • flush-latency-max 

These changes are necessary to ensure these metrics are not confused with orthogonal operations, like RocksDB memtable flushes or cache flushes. They will be measuring the invocation of StateStore#commit, which replaces StateStore#flush.

While the flush metrics are only deprecated, they will no longer record any data under normal use, as Kafka Streams will no longer call StateStore#flush().

Proposed Changes

To ensure that data is not written to a state store until it has been committed to the changelog, we need to isolate writes from the underlying database until changelog commit. To achieve this, we introduce the concept of transaction Isolation Levels, that dictate the visibility of records, written by processing threads, to Interactive Query threads.

We enable configuration of the level of isolation provided by StateStores via a default.state.isolation.level, which can be configured to either:

default.state.isolation.levelDescription
READ_UNCOMMITTED

Records written by the StreamThread are visible to all Interactive Query threads immediately. This level provides no atomicity, consistency, isolation or durability guarantees.

Under this Isolation Level, Streams behaves as it currently does, wiping state stores on-error when the processing.mode is one of exactly-once, exactly-once-v2  or exactly-once-beta.

READ_COMMITTED

Records written by the StreamThread are only visible to Interactive Query threads once they have been committed.

Under this Isolation Level, Streams will isolate writes from state stores until commit. This guarantees consistency of the on-disk data with the store changelog, so Streams will not need to wipe stores on-error.

In Kafka Streams, all StateStore s are written to by a single StreamThread  (this is the Single Writer principle). However, multiple other threads may concurrently read from StateStore s, principally to service Interactive Queries. In practice, this means that under READ_COMMITTED, writes by the StreamThread  that owns the StateStore  will only become visible to Interactive Query threads once commit()  has been called.

The default value for default.state.isolation.level will be READ_UNCOMMITTED, to mirror the behaviour we have today; but this will be automatically set to READ_COMMITTED if the processing.mode has been set to an EOS mode, and the user has not explicitly set deafult.state.isolation.level to READ_UNCOMMITTED. This will provide EOS users with the most useful behaviour out-of-the-box, but ensures that they may choose to sacrifice the benefits of transactionality to ensure that Interactive Queries can read records before they are committed, which is required by a minority of use-cases.

In-memory Transaction Buffers

Many StateStore implementations, including RocksDB, will buffer records written to a transaction entirely in-memory, which could cause issues, either with JVM heap or native memory. To mitigate this, we will automatically force a Task commit if the total memory used for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes() exceeds the threshold configured by statestore.uncommitted.max.bytes. This will roughly bound the memory required for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failure. Each StreamThread will be given 1/num.stream.threads of the configured limits, dividing it fairly between them.

It's possible that some Topologies can generate many more new StateStore entries than the records they process, in which case, it would be possible for such a Topology to cross the configured record/memory thresholds mid-processing, potentially causing an OOM error if these thresholds are exceeded by a lot. To mitigate this, the StreamThread will measure the increase in records/bytes written on each iteration, and pre-emptively commit if the next iteration is likely to cross the threshold.

Note that this new method provides default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.

Interactive Queries

Interactive queries currently see every record, as soon as they are written to a StateStore. This can cause some consistency issues, as interactive queries can read records before they're committed to the Kafka changelog, which may be rolled-back. To address this, we have introduced configurable isolation levels, configured globally via default.state.isolation.level (see above).

When operating under the READ_COMMITTED isolation level, the maximum time for records to become visible to interactive queries will be commit.interval.ms. Under EOS, this is by default a low value (100 ms), but under at-least-once, the default is 30 seconds. Users may need to adjust their commit.interval.ms to meet the visibility latency goals for their use-case.

When operating under the READ_UNCOMMITTED isolation level, (i.e. ALOS), all records will be immediately visible to interactive queries, so the high default commit.interval.ms of 30s will have no impact on interactive query latency.

Error Handling

Kafka Streams currently generates a TaskCorruptedException when a Task needs to have its state wiped (under EOS) and be re-initialized. There are currently several different situations that generate this exception:

  1. No offsets for the store can be found when opening it under EOS.
  2. OutOfRangeException during restoration, usually caused by the changelog being wiped on application reset.
  3. TimeoutException under EOS, when writing to or committing a Kafka transaction.

The first two of these are extremely rare, and make sense to keep. However, timeouts are much more frequent. They currently require the store to be wiped under EOS because when a timeout occurs, the data in the local StateStore will have been written, but the data in the Kafka changelog will have failed to be written, causing a mismatch in consistency.

With Transactional StateStores, we can guarantee that the local state is consistent with the changelog, therefore, it will no longer be necessary to reset the local state on a TimeoutException when operating under the READ_COMMITTED isolation level.

Atomic Checkpointing

Kafka Streams currently stores the changelog offsets for a StateStore in a per-Task on-disk file, .checkpoint, which under EOS, is written only when Streams shuts down successfully. There are two major problems with this approach:

  • To ensure that the data on-disk matches the checkpoint offsets in the .checkpoint file, we must flush the StateStores whenever we update the offsets in .checkpoint. This is a performance regression, as it causes a significant increase in the frequency of RocksDB memtable flushes, which increases load on RocksDB's compaction threads.
  • There's a race condition, where it's possible the application exits after data has been committed to RocksDB, but before the checkpoint file has been updated, causing a consistency violation.

To resolve this, we move the responsibility for offset management to the StateStore itself. The new commit method takes a map of all the changelog offsets that correspond to the state of the transaction buffer being committed.

RocksDBStore will store these offsets in a separate Column Family, and will be configured to atomically flush all its Column Families. This guarantees that the changelog offsets will always be flushed to disk together with the data they represent, irrespective of how that flush is triggered. This allows us to remove the explicit memtable flush(), enabling RocksDB to dictate when memtables are flushed to disk.

The existing .checkpoint files will be retained for any StateStore that does not set managesOffsets()  to true , and to ensure managed offsets are available when the store is closed. Existing offsets will be automatically migrated into StateStores that manage their own offsets, iff there is no offset returned by StateStore#committedOffset.

Required interface changes:

  • Add methods void commit(Map<TopicPartition, Long> changelogOffsets), boolean managesOffsets() and Long committedOffset(TopicPartition) to StateStore .
  • Deprecate method flush() on StateStore.

Offsets for Consumer Rebalances

Kafka Streams directly reads from the Task .checkpoint file during Consumer rebalance, in order to optimize assignments of stateful Tasks by assigning them to the instance with the most up-to-date copy of the data, which minimises restoration. To allow this to continue functioning, Kafka Streams will continue to write the changelog offsets to the .checkpoint file, even for stores that manage their own offsets.

Offsets will be written to .checkpoint at the following times:

  1. During StateStore initialization, in order to synchronize the offsets in .checkpoint  with the offsets returned by StateStore#committedOffset(TopicPartition), which are the source of truth for stores that manage their own offsets.
  2. When the StateStore is closed, in order to ensure that the offsets used for Task assignment reflect the state persisted to disk.
  3. At the end of every Task commit, if-and-only-if at least one StateStore in the Task is persistent and does not manage its own offsets. This ensures that stores that don't manage their offsets continue to have their offsets persisted to disk whenever the StateStore data itself is committed.
    • Avoiding writing .checkpoint when every persistent store manages its own offsets ensures we don't pay a significant performance penalty when the commit interval is short, as it is by default under EOS.
    • Since all persistent StateStores provided by Kafka Streams will manage their own offsets, the common case is that the .checkpoint file will not be updated on commit(Map) 

Tasks that are already assigned to an instance, already use the in-memory offsets when calculating partition assignments, so no change is necessary here.

Interactive Query .position Offsets

Input partition "Position" offsets, introduced by KIP-796: Interactive Query v2, are currently stored in a .position file by the RocksDBStore implementation. To ensure consistency with the committed data and changelog offsets, these position offsets will be stored in RocksDB, in the same column family as the changelog offsets, instead of the .position file. When a StateStore that manages its own offsets is first initialized, if a .position file exists in the store directory, its offsets will be automatically migrated into the store, and the file will be deleted.

When writing data to a RocksDBStore (via put, delete, etc.), the input partition offsets will be read from the changelog record metadata (as before), and these offsets will be added to the current transactions WriteBatch. When the StateStore is committed, the position offsets in the current WriteBatch will be written to RocksDB, alongside the records they correspond to. Alongside this, RocksDBStore will maintain two Position maps in-memory, one containing the offsets pending in the current transaction's WriteBatch, and the other containing committed offsets. On commit(Map), the uncommitted Position map will be merged into the committed Position map. In this sense, the two Position maps will diverge during writes, and re-converge on-commit.

When an interactive query is made under the READ_COMMITTED isolation level the PositionBound will constrain the committed Position map, whereas under READ_UNCOMMITTED, the PositionBound will constrain the uncommitted Position map.

RocksDB Transactions

When the isolation level is READ_COMMITTED, we will use RocksDB's WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. When reading records from the StreamThread, we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query. When reading records from Interactive Queries, we will use the regular RocksDB#get and RocksDB#newIterator methods, to ensure we see only records that have been committed (see above). The performance of this is expected to actually be better than the existing, non-batched write path. The main performance concern is that the WriteBatch must reside completely in-memory until it is committed, which is addressed by statestore.uncommitted.max.bytes, see above.

Compatibility, Deprecation, and Migration Plan

The above changes will retain compatibility for all existing StateStores, including user-defined custom implementations. Any StateStore that extends RocksDBStore will automatically inherit its behaviour, although its internals will change, potentially requiring users that depend on internal behaviour to update their code.

All new methods on existing classes will have defaults set to ensure compatibility.

Kafka Streams will automatically migrate offsets found in an existing .checkpoint file, and/or an existing .position file, to store those offsets directly in the StateStore, if managesOffsets returns true. Users of the in-built store types will not need to make any changes. See Upgrading.

Users may notice a change in the performance/behaviour of Kafka Streams. Most notably, under EOS Kafka Streams will now regularly "commit" StateStores, where it would have only done so when the store was closing in the past. The overall performance of this should be at least as good as before, but the profile will be different, with write latency being substantially faster, and commit latency being a bit higher.

Upgrading

When upgrading to a version of Kafka Streams that includes the changes outlined in this KIP, users will not be required to take any action. Kafka Streams will automatically upgrade any RocksDB stores to manage offsets directly in the RocksDB database, by importing the offsets from any existing .checkpoint and/or .position files.

Users that currently use processing.mode: exactly-once(-v2|-beta) and who wish to continue to read uncommitted records from their Interactive Queries will need to explicitly set default.state.isolation.level: READ_UNCOMMITTED.

Downgrading

When downgrading from a version of Kafka Streams that includes the changes outlined in this KIP to a version that does not contain these changes, users will not be required to take any action. The older Kafka Streams version will be unable to open any RocksDB stores that were upgraded to store offsets (see Upgrading), which will cause Kafka Streams to wipe the state for those Tasks and restore the state, using an older RocksDB store format, from the changelogs.

Since downgrading is a low frequency event, and since restoring state from scratch is already an existing failure mode for older versions of Kafka Streams, we deem this an acceptable automatic downgrade strategy.

Test Plan

Testing will be accomplished by both the existing tests and by writing some new unit tests that verify atomicity, durability and consistency guarantees that this KIP provides.

Rejected Alternatives

Dual-Store Approach (KIP-844)

The design outlined in KIP-844, sadly, does not perform well (as described above), and requires users to opt-in to transactionality, instead of being a guarantee provided out-of-the-box.

Replacing RocksDB memtables with ThreadCache

It was pointed out on the mailing list that Kafka Streams fronts all RocksDB StateStores with a configurable record cache, and that this cache duplicates the function requests for recently written records provided by RocksDB memtables. A suggestion was made to utilize this record cache (the ThreadCache class) as a replacement for memtables, by directly flushing them to SSTables using the RocksDB SstFileWriter.

This is out of scope of this KIP, as its goal would be reducing the duplication (and hence, memory usage) of RocksDB StateStores; whereas this KIP is tasked with improving the consistency of StateStores to reduce the frequency and impact of state restoration, improving their scalability.

It has been recommended to instead pursue this idea in a subsequent KIP, as the interface changes outlined in this KIP should be compatible with this idea.

Transactional support under READ_UNCOMMITTED

When query isolation level is READ_UNCOMMITTED, Interactive Query threads need to read records from the ongoing transaction buffer. Unfortunately, the RocksDB WriteBatch is not thread-safe, causing Iterators created by Interactive Query threads to produce invalid results/throw unexpected errors as the WriteBatch is modified/closed during iteration.

Ideally, we would build an implementation of a transaction buffer that is thread-safe, enabling Interactive Query threads to query it safely. One approach would be to "chain together" WriteBatches, creating a new WriteBatch every time a new Iterator is created by an Interactive Query thread and "freezing" the previous WriteBatch.

It was decided to defer tackling this problem to a later KIP, in order to realise the benefits of transactional state stores to users as quickly as possible.

Query-time Isolation Levels

It was requested that users be able to select the isolation level of queries on a per-query basis. This would require some additional API changes (to the Interactive Query APIs). Such an API would require that state stores are always transactional, and that the transaction buffers can be read from by READ_UNCOMMITTED queries. Due to the problems outlined in the previous section, it was decided to also defer this to a subsequent KIP.

The new configuration option default.state.isolation.level was deliberately named to enable query-time isolation levels in the future, whereby any query that didn't explicitly choose an isolation level would use the configured default. Until then, this configuration option will globally control the isolation level of all queries, with no way to override it per-query

StateStore Transactions are only committed once the Kafka transaction for their changelog entries have been committed. This ensures that, under exactly-once, the records written to disk always reflect the state of the store's changelog.

The at-least-once processing mode does not need the isolation guarantees that transactions provide, and indeed, doing so could cause problems as the default commit.interval.ms under at-least-once is 30 seconds, which would require buffering records for up to 30 seconds, unnecessarily. For this reason, under at-least-once, Streams will automatically configure all StateStores to use the READ_UNCOMMITTED isolation level, which provides no isolation guarantees, and ensures that records are immediately written to disk.

RocksDB Transactions

When the isolation level is READ_COMMITTED, we will use RocksDB's WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. When reading records, we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query. The performance of this is expected to actually be better than the existing, non-batched write path. The main performance concern is that the buffer must reside completely in-memory until it is committed.

To mitigate this, we will automatically force a Task commit if the total uncommitted records returned by StateStore#approximateNumUncommittedEntries()  exceeds a threshold, configured by statestore.uncommitted.max.records; or the total memory used for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes() exceeds the threshold configured by statestore.uncommitted.max.bytes. This will roughly bound the memory required per-Thread for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failure.

It's possible that some Topologies can generate many more new StateStore entries than the records they process, in which case, it would be possible for such a Topology to cross the configured record/memory thresholds mid-processing, potentially causing an OOM error if these thresholds are exceeded by a lot. To mitigate this, the StreamThread will measure the increase in records/bytes written on each iteration, and pre-emptively commit if the next iteration is likely to cross a threshold.

Note that these new methods provide default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.

Transaction Management

A wrapper TransactionalKeyValueStore will be provided, which takes care of automatically creating a new transaction when it's initialized, and starting a new transaction after the existing one has been committed. This is the principle way that transactions will be managed by the Streams engine. A wrapper TransactionalSegment will also be provided, which extends TransactionalKeyValueStore, providing automatic transaction handling for Segments.

The AbstractTransaction and AbstractTransactionalStore classes detailed above provide a base implementation for transactionality that will automatically track and manage open transactions, to ensure that they are properly closed when their StateStore closes, ensuring no resource leaks. These are the base classes for RocksDBStore and RocksDBTransaction, respectively.

Atomic Checkpointing

Kafka Streams currently stores the changelog offsets for a StateStore in a per-Task on-disk file, .checkpoint, which under EOS, is written only when Streams shuts down successfully. There are two major problems with this approach:

  • If this file is only written on a clean exit, we won't know the offsets our StateStore contains, and therefore will need to wipe and restore on any error, nullifying many of the improvements gained by Transactional State Stores.
  • If we write to this file on every successful commit, there's a race condition, where it's possible the application exits after data has been committed to RocksDB, but before the checkpoint file has been updated, causing a consistency violation.

To resolve this, we move the responsibility for offset management to the StateStore itself. The new commit method takes a map of all the changelog offsets that correspond to the state of the Transaction being committed.

RocksDBStore will store these offsets in a separate Column Family, and will be configured to atomically flush all its Column Families. This guarantees that the changelog offsets will always be flushed to disk together with the data they represent, irrespective of how that flush is triggered. This allows us to remove the explicit memtable flush(), enabling RocksDB to dictate when memtables are flushed to disk. This will likely improve performance and reduce compactions, as it will no longer be required to flush new SST files every 10,000 records.

The existing .checkpoint files will be retained for any StateStore that does not set managesOffsets()  to true , and the existing offsets will be automatically migrated into StateStores that manage their own offsets, iff there is no offset returned by StateStore#getCommittedOffset.

Interactive Queries

Interactive queries currently see every record, as soon as they are written to a StateStore. This can cause some consistency issues, as interactive queries can read records before they're committed to the Kafka changelog, which may be rolled-back. To address this, interactive queries will query the underlying StateStore, and will not be routed through a Transaction. This ensures that interactive queries see a consistent view of the store, as they will not be able to read any uncommitted records.

When operating under IsolationLevel.READ_COMMITTED (i.e. EOS), the maximum time for records to become visible to interactive queries will be commit.interval.ms, however, this is automatically set to a low value, and it's not recommended for users to use high intervals when operating under EOS.

When operating under IsolationLevel.READ_UNCOMMITTED, (i.e. ALOS), all records will be immediately visible to interactive queries, so the high default commit.interval.ms of 30s will have no impact on interactive query latency.

Error Handling

Kafka Streams currently generates a TaskCorruptedException when a Task needs to have its state wiped (under EOS) and be re-initialized. There are currently several different situations that generate this exception:

  1. No offsets for the store can be found when opening it under EOS.
  2. OutOfRangeException during restoration, usually caused by the changelog being wiped on application reset.
  3. TimeoutException under EOS, when writing to or committing a Kafka transaction.

The first two of these are extremely rare, and make sense to keep. However, timeouts are much more frequent. They currently require the store to be wiped under EOS because when a timeout occurs, the data in the local StateStore will have been written, but the data in the Kafka changelog will have failed to be written, causing a mismatch in consistency.

With Transactional StateStores and Atomic Checkpointing, we can guarantee that the local state is consistent with the changelog, therefore, it will no longer be necessary to reset the local state on a TimeoutException.

Compatibility, Deprecation, and Migration Plan

The above changes will retain compatibility for all existing StateStores, including user-defined custom implementations. Any StateStore that extends RocksDBStore will automatically inherit its behaviour, although users that directly write via the db RocksDB  instance may need to switch to using the dbAccessor to ensure consistent results.

All new methods on existing classes will have defaults set to ensure compatibility.

As noted above, any store for which managesOffsets() returns true, will have its existing offset(s) migrated from the existing .checkpoint file, when the store is first opened.

Test Plan

Testing will be accomplished by both the existing tests and by writing some new unit tests that verify atomicity, durability and consistency guarantees that this KIP provides.

Rejected Alternatives

The design outlined in KIP-844, sadly, does not perform well (as described above), and requires users to opt-in to transactionality, instead of being a guarantee provided out-of-the-box.