Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Reduced scope of KIP

...

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
    /**
     * Flush any cached data
     *
     * @deprecated since KIP-892. Use {@link #commit(Map)} instead.
     */
    @Deprecated
    default void flush() {}

    /**
     * Commit the current transaction to this StateStore with the given changelog offset.
     * <p>
     * This is a convenience method for {@link #commit(Map) commit(Collections.singletonMap(null, changelogOffset))}.
     * <p>
     * This method is used to commit records to a regular, logged StateStore.
     *
     * @see #commit(Map)
     * @param changelogOffset The offset of the changelog topic this commit corresponds to. The offset can be
     *                        {@code null} if this StateStore does not have a changelog.
     */
    @Evolving
    default void commit(final Long changelogOffset) {
        commit(Collections.singletonMap(null, changelogOffset));
    }

    /**
     * Commit the current transaction to this StateStore with the given offsets.
     * <p>
     * All records that were written to this store since the last {@code commit} will be written to the store
     * atomically, i.e. either all will be written, or none of them will.
     * <p>
     * After this method returns, calls to {@link #getCommittedOffset(TopicPartition)} will return the associated value
     * of {@code offsets}.
     * <p>
     * {@code offsets} will contain one of either:
     * <ul>
     *     <li>A {@code Map} of offsets for each input partition of this Global StateStore.</li>
     *     <li>A {@code Map} containing one mapping from {@code null} to the offset of the changelog partition for this
     *     logged StateStore.</li>
     *     <li>A {@code null}, if this StateStore is not {@link StoreBuilder#withLoggingDisabled() logged}.</li>
     * </ul>
     * <p>
     * If this store is {@link #persistent() persistent}, it is required that on-restart, the offsets returned by {@link
     * #getCommittedOffset(TopicPartition)} correspond with the records persisted in the StateStore.
     * 
     * @param offsets The offset(s) for the input topics this commit corresponds to. May be {@code null} if this
     *                StateStore has no input topics or changelog.
     */
    @Evolving
    default void commit(final Map<TopicPartition, Long> offsets) {
        flush();
    }

    /**
     * Returns the latest committed changelog offset that the materialized state in this store represents.
     * <p>
     * The state in the store is guaranteed to reflect exactly the state in the changelog up to this offset.
     * <p>
     * This may be {@code null}, if the store does not contain any metadata on its changelog offset. In this case, the
     * StateStore itself makes no guarantees about its contents.
     * <p>
     * Equivalent to calling {@code getCommittedOffset(null)}.
     * <p>
     * This method should <em>not</em> be called for global StateStores, as they are not backed by a changelog.
     *
     * @return The latest committed changelog offset that the state in this store represents, or {@code null}, if no
     *         changelog offset metadata is available and no guarantees can be made.
     */
    @Evolving
    default Long getCommittedOffset() {
        return getCommittedOffset(null);
    }

    /**
     * Returns the latest committed changelog offset that the materialized state in this store represents.
     * <p>
     * The state in the store is guaranteed to reflect exactly the state in the changelog up to this offset.
     * <p>
     * This may be {@code null}, if the store does not contain any metadata on its changelog offset. In this case, the
     * StateStore itself makes no guarantees about its contents.
     * <p>
     * If {@code topicPartition} is {@code null}, the returned offset will be the offset for the changelog partition of
     * this StateStore, if one exists.
     *
     * @return The latest committed offset that the state in this store represents, or {@code null}, if no
     *         changelog offset metadata is available and no guarantees can be made.
     */
    @Evolving
    default Long getCommittedOffset(final TopicPartition topicPartition) {
        return null;
    }


    /**
     * Return an approximate count of records not yet committed to this StateStore.
     * <p>
     * This method will return an approximation of the number of records that would be committed by the next call to
     * {@link #commit(Map)}.
     * <p>
     * If this StateStore is unable to approximately count uncommitted records, it will return {@code -1}.
     * If this StateStore does not support atomic transactions, it will return {@code 0}, because records will always be
     * immediately written to a non-transactional store, so there will be none awaiting a {@link #commit(Map)}.
     *
     * @return The approximate number of records awaiting {@link #commit(Map)}, {@code -1} if the number of
     *         uncommitted records can't be counted, or {@code 0} if this StateStore does not support transactions.
     */
    @Evolving
    default long approximateNumUncommittedEntries() {
        return 0;
    }

   /**
     * Return an approximate count of memory used by records not yet committed to this StateStore.
     * <p>
     * This method will return an approximation of the memory would be freed by the next call to {@link #commit(Map)}.
     * <p>
     * If this StateStore is unable to approximately count uncommitted memory usage, it will return {@code -1}.
     * If this StateStore does not support atomic transactions, it will return {@code 0}, because records will always be
     * immediately written to a non-transactional store, so there will be none awaiting a {@link #commit(Map)}.
     *
     * @return The approximate size of all records awaiting {@link #commit(Map)}, {@code -1} if the size of uncommitted
     *         records can't be counted, or {@code 0} if this StateStore does not support transactions.
     */
    @Evolving
    default long approximateNumUncommittedBytes() {
        return 0;
    }

    /**
     * Return if this implementation manages its own checkpointing during {@link #commit(Map)}.
     * <p>
     * If this method returns {@code true}, and {@link #persistent()} returns {@code true}, then {@link
     * #getCommittedOffset(TopicPartition)} MUST return the offsets provided to the most recent call to {@link
     * #commit(Map)}, even if the application has been restarted since the last {@link #commit(Map)}.
     * <p>
     * If this method returns {@code false}, this StateStore may be wrapped in a default checkpoint implementation
     * by the Kafka Streams engine.
     */
    @Evolving
    default boolean managesCheckpoints() {
        return false;
    }

Proposed Changes

There are two parts to this KIP:

...

Buffering writes with WriteBatchWithIndex

RocksDB provides WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. The performance overhead of doing this should be negligible. The only main performance consideration is that the buffer must reside completely in-memory until it is committed.

To mitigate this, under EOS we will automatically force a Task commit if the total uncommitted records returned by StateStore#approximateNumUncommittedEntries()  exceeds a threshold, configured by statestore.uncommitted.max.records; or the total memory used for buffering uncommitted records returned by StateStore#approximateNumUncommittedBytes() exceeds the threshold configured by statestore.uncommitted.max.bytes. This will roughly bound the memory required per-Task for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failure.When running under ALOS, we will instead automatically commit only the Task state stores, but not the entire Task itself (i.e. the consumer offsets), as an optimization to preserve the users desired commit.interval.ms.

Note that these new methods provide default implementations that ensure existing custom stores and non-transactional stores (e.g. InMemoryKeyValueStore) do not force any early commits.

When reading records, we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query. This minimizes the amount of custom code needed to implement transactionality, and allows RocksDB to perform optimizations.

StateStore ownership of checkpointing

One issue with both the existing RocksDBStore and KIP-844 Transactional StateStores that is not resolved by WriteBatchWithIndex is that to guarantee consistency with the Task checkpointing, it is required to explicitly flush the memtables on every commit. This is not what RocksDB was designed for, and leads to sub-optimal performance, especially for lower throughput stores, where many small sstable files are created. This increased "write-amplification" increases pressure on RocksDB's compaction threads, as they have more sstables to compact together.

The existing Task checkpoints are also only written when the Task is cleanly closed. This would cause stores to be wiped and re-built in the event of a crash, even when unnecessary.

To resolve this, we will move the responsibility for StateStore checkpointing to the StateStore interface itself.

When calling StateStore#commit , the offset(s) for the changelog partition, or input partitions for global stores, will be provided to the store. The StateStore itself will then determine how best to checkpoint the data it's committing.

For RocksDBStore, we will store offsets in a separate column-family, offsetMetadata, which is updated as part of the current batch during commit. We ensure that the memtables for our data and metadata column-families are atomically flushed by RocksDB by enabling Atomic Flushes during store initialization. This will guarantee that all records written to the RocksDB memtables, and subsequent on-disk sstables, will always be accompanied by the changelog/input partition offsets that they correspond to, without the need to explicitly flush memtables.

Query Position data

As part of the Intereactive Query v2 (IQv2) initiative, StateStores already track Position offsets, used to bound queries. To improve atomicity, these offsets will also be written to the offsetsMetadata column-family, and atomically committed along with records and changelog/input partition offsets.

Changes to StateManager

Currently, StateManager implementations (ProcessorStateManager for regular stores and GlobalStateManagerImpl for global stores) manage checkpointing of their stores via an on-disk file. This is done through 3 methods in each class:

  • flush()
  • updateChangelogOffsets(Map)
  • checkpoint()

These methods are always called together, with one exception: checkpoint is only called if the number of records processed since the last commit is more than the hard-coded threshold of 10,000. This is presumably a performance optimization to prevent checkpoint files being written every 100 milliseconds.

We will replace these 3 methods, with one:

  • commit(Map)

Concurrent Access by Interactive Queries

While the Task that owns a StateStore is processed by a single thread, concurrent access is possible when other threads perform an interactive query on the StateStore. While RocksDB itself is thread-safe, and optimized for concurrent access, WriteBatchWithIndex is not thread-safe. Care will need to be taken to ensure the consistency and safety of the batch in the presence of concurrent access from interactive query threads.

All RocksDBStore access methods (get, put , etc.) are already synchronized, however, it's possible for iterators to escape the synchronization locking, enabling a RocksDBIterator to be in-use while the underlying WriteBatchWithIndex is being modified. In addition to tackling this problem, we will evaluate whether the access methods need to be syncrhonized, as this may be an unnecessary performance penaltyThis will delegate the checkpointing procedure to the underlying StateStore.  The StateStoreMetadata , which currently stores the offset for each changelog partition in-memory, will be updated to instead delegate to StateStore#getCommittedOffset().

Compatibility, Deprecation, and Migration Plan

Existing stores maintain their current checkpoints and position offsets in files. These files will still be read, if present, and used to automatically migrate an existing store to being transactional:

For .checkpoint files:

  1. If a checkpoint offset file exists with an offset for a changelog partition of an existing store:
    1. The store will be checked for its own offset via StateStore#getCommittedOffset() 
      1. If one exists, the checkpoint file will be ignored.
      2. If one doesn't exist, StateStore#commit(Map) will be called, with the offset(s) from the checkpoint file.
  2. If no checkpoint offsets exist in the store, and no checkpoint offset file exists, the store data will be deleted and restored from the changelog.
    • This ensures that corrupt stores will also be properly handled during migration.
    • Only the corrupt store(s) will be deleted. The Task directory itself will not be deleted.
  3. The checkpoint offset file will be deleted.

And for .position files:

  1. If a position offset file exists for a store:
    1. The store metadata column-family will be checked for existing position offsets.
      1. If none exist, the data in the position file will be written to the metadata column-family.
  2. The position offset file will be deleted.

Custom StateStore implementations that have not been updated to manage their own offsets will be automatically adapted (via CheckpointingStateStoreAdapter) to use the legacy (per-Task checkpoint file) checkpoint implementation. This is not expected to be optimal, but will ensure consistency for these stores. It is recommended that custom StateStore implementations be upgraded to manage their own offsets using the most optimal approach available to them.

Note: custom implementations that extend an internal implementation, like RocksDBStore, will automatically assume the checkpointing behaviour of that implementation, and should automatically function as expectedThe above changes will retain compatibility for all existing StateStores, including user-defined custom implementations. Any StateStore that extends RocksDBStore will automatically inherit its behaviour, although users that directly write via the db RocksDB  instance may need to switch to using the dbAccessor to ensure consistent results.

Test Plan

Testing will be accomplished by both the existing tests and by writing some new unit tests that verify atomicity, durability and consistency guarantees that this KIP provides.

...