You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current state: Under Discussion

Discussion thread: Thread

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As described in KIP-844, under EOS, crash failures cause all Task state to be wiped out on restart. This is because, currently, data is written to the StateStore before the commit to its changelog has completed, so it's possible that records are written to disk that were not committed to the store changelog.

In KIP-844, it was proposed to create an alternative type of StateStore, which would enable users to opt-in to "transactional" behaviour, that ensured data was only persisted once the changelog commit has succeeded. However, the design and approach outlined in KIP-844 unfortunately did not perform well when tested (with a write throughput that was approximately only 4% of the regular RocksDB StateStore!).

This KIP explores an alternative design that should have little/no performance impact, and can thus be enabled for all stores.

Public Interfaces

Changed:

  • org.apache.kafka.streams.processor.StateStore

Changes:

org.apache.kafka.streams.processor.StateStore
    /**
     * Flush any cached data
     *
     * @deprecated since KIP-892. Use {@link #commit(Map)} instead.
     */
    @Deprecated
    default void flush() {}

    /**
     * Commit the current transaction to this StateStore with the given changelog offset.
     * <p>
     * This is a convenience method for {@link #commit(Map) commit(Collections.singletonMap(null, changelogOffset))}.
     * <p>
     * This method is used to commit records to a regular, logged StateStore.
     *
     * @see #commit(Map)
     * @param changelogOffset The offset of the changelog topic this commit corresponds to. The offset can be
     *                        {@code null} if this StateStore does not have a changelog.
     */
    @Evolving
    default void commit(final Long changelogOffset) {
        commit(Collections.singletonMap(null, changelogOffset));
    }

    /**
     * Commit the current transaction to this StateStore with the given offsets.
     * <p>
     * All records that were written to this store since the last {@code commit} will be written to the store
     * atomically, i.e. either all will be written, or none of them will.
     * <p>
     * After this method returns, calls to {@link #getCommittedOffset(TopicPartition)} will return the associated value
     * of {@code offsets}.
     * <p>
     * {@code offsets} will contain one of either:
     * <ul>
     *     <li>A {@code Map} of offsets for each input partition of this Global StateStore.</li>
     *     <li>A {@code Map} containing one mapping from {@code null} to the offset of the changelog partition for this
     *     logged StateStore.</li>
     *     <li>A {@code null}, if this StateStore is not {@link StoreBuilder#withLoggingDisabled() logged}.</li>
     * </ul>
     * <p>
     * If this store is {@link #persistent() persistent}, it is required that on-restart, the offsets returned by {@link
     * #getCommittedOffset(TopicPartition)} correspond with the records persisted in the StateStore.
     * 
     * @param offsets The offset(s) for the input topics this commit corresponds to. May be {@code null} if this
     *                StateStore has no input topics or changelog.
     */
    @Evolving
    default void commit(final Map<TopicPartition, Long> offsets) {
        flush();
    }

    /**
     * Returns the latest committed changelog offset that the materialized state in this store represents.
     * <p>
     * The state in the store is guaranteed to reflect exactly the state in the changelog up to this offset.
     * <p>
     * This may be {@code null}, if the store does not contain any metadata on its changelog offset. In this case, the
     * StateStore itself makes no guarantees about its contents.
     * <p>
     * Equivalent to calling {@code getCommittedOffset(null)}.
     * <p>
     * This method should <em>not</em> be called for global StateStores, as they are not backed by a changelog.
     *
     * @return The latest committed changelog offset that the state in this store represents, or {@code null}, if no
     *         changelog offset metadata is available and no guarantees can be made.
     */
    @Evolving
    default Long getCommittedOffset() {
        return getCommittedOffset(null);
    }

    /**
     * Returns the latest committed changelog offset that the materialized state in this store represents.
     * <p>
     * The state in the store is guaranteed to reflect exactly the state in the changelog up to this offset.
     * <p>
     * This may be {@code null}, if the store does not contain any metadata on its changelog offset. In this case, the
     * StateStore itself makes no guarantees about its contents.
     * <p>
     * If {@code topicPartition} is {@code null}, the returned offset will be the offset for the changelog partition of
     * this StateStore, if one exists.
     *
     * @return The latest committed offset that the state in this store represents, or {@code null}, if no
     *         changelog offset metadata is available and no guarantees can be made.
     */
    @Evolving
    default Long getCommittedOffset(final TopicPartition topicPartition) {
        return null;
    }

Proposed Changes

There are two parts to this KIP:

  1. Buffering writes to RocksDB using WriteBatchWithIndex.
  2. Moving responsibility for store checkpointing inside the StateStore  itself.

Buffering writes with WriteBatchWithIndex

RocksDB provides WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. The performance overhead of doing this should be negligible. The only performance consideration is that the buffer must reside completely in-memory until it is committed. RocksDB recommend buffers no more than 3-4 MiB for optimal performance. With a commit.interval.ms  of 100 milliseconds, which is the default when under EOS, and an average record size of 1KiB, a 4MiB buffer should allow for a throughput of ~40,960 records/second. This is a worst-case estimate, as most use-cases will have a considerably smaller record size, providing for significantly increased throughput. For use-cases with larger record sizes, higher throughput could be sustained, at the cost of a higher memory usage.

It is therefore not generally expected that this will cause any out-of-memory errors or memory contention that was initially raised as a problem in KIP-844. If this does become a problem, a later KIP could resolve this by tracking the size of the uncommitted records in-memory, and prematurely forcing a commit of the Task when it crosses a configurable threshold. This is outside of the scope of this KIP, as it may not be necessary.

When reading records, we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query. This minimizes the amount of custom code needed to implement transactionality, and allows RocksDB to perform optimizations.

StateStore ownership of checkpointing

One issue with both the existing RocksDBStore and KIP-844 Transactional StateStores that is not resolved by WriteBatchWithIndex is that to guarantee consistency with the Task checkpointing, it is required to explicitly flush the memtables on every commit. This is not what RocksDB was designed for, and leads to sub-optimal performance, especially for lower throughput stores, where many small sstable files are created. This increased "write-amplification" increases pressure on RocksDB's compaction threads, as they have more sstables to compact together.

The existing Task checkpoints are also only written when the Task is cleanly closed. This would cause stores to be wiped and re-built in the event of a crash, even when unnecessary.

To resolve this, we will move the responsibility for StateStore checkpointing to the StateStore interface itself.

When calling StateStore#commit , the offset(s) for the changelog partition, or input partitions for global stores, will be provided to the store. The StateStore itself will then determine how best to checkpoint the data it's committing.

For RocksDBStore, we will store offsets in a separate column-family, offsetMetadata, which is updated as part of the current batch during commit. We ensure that the memtables for our data and metadata column-families are atomically flushed by RocksDB by enabling Atomic Flushes during store initialization. This will guarantee that all records written to the RocksDB memtables, and subsequent on-disk sstables, will always be accompanied by the changelog/input partition offsets that they correspond to, without the need to explicitly flush memtables.

Query Position data

As part of the Intereactive Query v2 (IQv2) initiative, StateStores already track Position offsets, used to bound queries. To improve atomicity, these offsets will also be written to the offsetsMetadata column-family, and atomically committed along with records and changelog/input partition offsets.

Changes to StateManager

Currently, StateManager implementations (ProcessorStateManager for regular stores and GlobalStateManagerImpl for global stores) manage checkpointing of their stores via an on-disk file. This is done through 3 methods in each class:

  • flush()
  • updateChangelogOffsets(Map)
  • checkpoint()

These methods are always called together, with one exception: checkpoint is only called if the number of records processed since the last commit is more than the hard-coded threshold of 10,000. This is presumably a performance optimization to prevent checkpoint files being written every 100 milliseconds.

We will replace these 3 methods, with one:

  • commit(Map)

This will delegate the checkpointing procedure to the underlying StateStore.  The StateStoreMetadata , which currently stores the offset for each changelog partition in-memory, will be updated to instead delegate to StateStore#getCommittedOffset().

Compatibility, Deprecation, and Migration Plan

Existing stores maintain their current checkpoints and position offsets in files. These files will still be read, if present, and used to automatically migrate an existing store to being transactional:

For .checkpoint files:

  1. If a checkpoint offset file exists with an offset for a changelog partition of an existing store:
    1. The store will be checked for its own offset via StateStore#getCommittedOffset() 
      1. If one exists, the checkpoint file will be ignored.
      2. If one doesn't exist, StateStore#commit(Map) will be called, with the offset(s) from the checkpoint file.
  2. If no checkpoint offsets exist in the store, and no checkpoint offset file exists, the store data will be deleted and restored from the changelog.
    • This ensures that corrupt stores will also be properly handled during migration.
    • Only the corrupt store(s) will be deleted. The Task directory itself will not be deleted.
  3. The checkpoint offset file will be deleted.

And for .position files:

  1. If a position offset file exists for a store:
    1. The store metadata column-family will be checked for existing position offsets.
      1. If none exist, the data in the position file will be written to the metadata column-family.
  2. The position offset file will be deleted.


Custom StateStore implementations will continue to operate, however, since Task checkpoints are no longer written, they will be expected to handle their own offsets. If they do not, they will be considered corrupt and wiped. It is up to users to upgrade custom implementations to ensure that this does not happen.

Note: custom implementations that extend an internal implementation, like RocksDBStore, will automatically assume the checkpointing behaviour of that implementation, and should automatically function as expected.

Test Plan

Testing will be accomplished by both the existing tests and by writing some new unit tests that verify atomicity, durability and consistency guarantees that this KIP provides.

Rejected Alternatives

The design outlined in KIP-844, sadly, does not perform well (as described above), and requires users to opt-in to transactionality, instead of being a guarantee provided out-of-the-box.


  • No labels