Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add mitigation for buffering OOM, by forcing premature Task commits.

...

This KIP explores an alternative design that should have little/no performance impact, and can thus be enabled for all stores.

Public Interfaces

New configuration:

  • max.uncommitted.state.entries.per.task: the maximum number of total StateStore entries permitted before prematurely forcing a Task commit. -1  for no maximum, ensuring that commits are controlled exclusively by commit.interval.ms.

Changed:

  • org.apache.kafka.streams.processor.StateStore

...

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
    /**
     * Flush any cached data
     *
     * @deprecated since KIP-892. Use {@link #commit(Map)} instead.
     */
    @Deprecated
    default void flush() {}

    /**
     * Commit the current transaction to this StateStore with the given changelog offset.
     * <p>
     * This is a convenience method for {@link #commit(Map) commit(Collections.singletonMap(null, changelogOffset))}.
     * <p>
     * This method is used to commit records to a regular, logged StateStore.
     *
     * @see #commit(Map)
     * @param changelogOffset The offset of the changelog topic this commit corresponds to. The offset can be
     *                        {@code null} if this StateStore does not have a changelog.
     */
    @Evolving
    default void commit(final Long changelogOffset) {
        commit(Collections.singletonMap(null, changelogOffset));
    }

    /**
     * Commit the current transaction to this StateStore with the given offsets.
     * <p>
     * All records that were written to this store since the last {@code commit} will be written to the store
     * atomically, i.e. either all will be written, or none of them will.
     * <p>
     * After this method returns, calls to {@link #getCommittedOffset(TopicPartition)} will return the associated value
     * of {@code offsets}.
     * <p>
     * {@code offsets} will contain one of either:
     * <ul>
     *     <li>A {@code Map} of offsets for each input partition of this Global StateStore.</li>
     *     <li>A {@code Map} containing one mapping from {@code null} to the offset of the changelog partition for this
     *     logged StateStore.</li>
     *     <li>A {@code null}, if this StateStore is not {@link StoreBuilder#withLoggingDisabled() logged}.</li>
     * </ul>
     * <p>
     * If this store is {@link #persistent() persistent}, it is required that on-restart, the offsets returned by {@link
     * #getCommittedOffset(TopicPartition)} correspond with the records persisted in the StateStore.
     * 
     * @param offsets The offset(s) for the input topics this commit corresponds to. May be {@code null} if this
     *                StateStore has no input topics or changelog.
     */
    @Evolving
    default void commit(final Map<TopicPartition, Long> offsets) {
        flush();
    }

    /**
     * Returns the latest committed changelog offset that the materialized state in this store represents.
     * <p>
     * The state in the store is guaranteed to reflect exactly the state in the changelog up to this offset.
     * <p>
     * This may be {@code null}, if the store does not contain any metadata on its changelog offset. In this case, the
     * StateStore itself makes no guarantees about its contents.
     * <p>
     * Equivalent to calling {@code getCommittedOffset(null)}.
     * <p>
     * This method should <em>not</em> be called for global StateStores, as they are not backed by a changelog.
     *
     * @return The latest committed changelog offset that the state in this store represents, or {@code null}, if no
     *         changelog offset metadata is available and no guarantees can be made.
     */
    @Evolving
    default Long getCommittedOffset() {
        return getCommittedOffset(null);
    }

    /**
     * Returns the latest committed changelog offset that the materialized state in this store represents.
     * <p>
     * The state in the store is guaranteed to reflect exactly the state in the changelog up to this offset.
     * <p>
     * This may be {@code null}, if the store does not contain any metadata on its changelog offset. In this case, the
     * StateStore itself makes no guarantees about its contents.
     * <p>
     * If {@code topicPartition} is {@code null}, the returned offset will be the offset for the changelog partition of
     * this StateStore, if one exists.
     *
     * @return The latest committed offset that the state in this store represents, or {@code null}, if no
     *         changelog offset metadata is available and no guarantees can be made.
     */
    @Evolving
    default Long getCommittedOffset(final TopicPartition topicPartition) {
        return null;
    }

Proposed Changes

There are two parts to this KIP:

  1. Buffering writes to RocksDB using WriteBatchWithIndex.
  2. Moving responsibility for store checkpointing inside the StateStore  itself.

Buffering writes with WriteBatchWithIndex

RocksDB provides WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. The performance overhead of doing this should be negligible. The only performance consideration is that the buffer must reside completely in-memory until it is committed. RocksDB recommend buffers no more than 3-4 MiB for optimal performance. With a commit.interval.ms  of 100 milliseconds, which is the default when under EOS, and an average record size of 1KiB, a 4MiB buffer should allow for a throughput of ~40,960 records/second. This is a worst-case estimate, as most use-cases will have a considerably smaller record size, providing for significantly increased throughput. For use-cases with larger record sizes, higher throughput could be sustained, at the cost of a higher memory usage.




    /**
     * Return an approximate count of records not yet committed to this StateStore.
     * <p>
     * This method will return an approximation of the number of records that would be committed by the next call to
     * {@link #commit(Map)}.
     * <p>
     * If this StateStore is unable to approximately count uncommitted records, it will return {@code -1}.
     * If this StateStore does not support atomic transactions, it will return {@code 0}, because records will always be
     * immediately written to a non-transactional store, so there will be none awaiting a {@link #commit(Map)}.
     *
     * @return The approximate number of records awaiting {@link #commit(Map)}, {@code -1} if the number of
     *         uncommitted records can't be counted, or {@code 0} if this StateStore does not support transactions.
     */
    @Evolving
    default long approximateNumUncommittedEntries() {
        return 0;
    }

Proposed Changes

There are two parts to this KIP:

  1. Buffering writes to RocksDB using WriteBatchWithIndex.
  2. Moving responsibility for store checkpointing inside the StateStore  itself.

Buffering writes with WriteBatchWithIndex

RocksDB provides WriteBatchWithIndex as a means to accomplishing atomic writes when not using the RocksDB WAL. The performance overhead of doing this should be negligible. The only performance consideration is that the buffer must reside completely in-memory until it is committed. To mitigate this, we will automatically force a Task commit if the total uncommitted records returned by StateStore#approximateNumUncommittedEntries  exceeds a threshold, configured by max.uncommitted.state.entries.per.task. This will roughly bound the memory required per-Task for buffering uncommitted records, irrespective of the commit.interval.ms, and will effectively bound the number of records that will need to be restored in the event of a failureIt is therefore not generally expected that this will cause any out-of-memory errors or memory contention that was initially raised as a problem in KIP-844. If this does become a problem, a later KIP could resolve this by tracking the size of the uncommitted records in-memory, and prematurely forcing a commit of the Task when it crosses a configurable threshold. This is outside of the scope of this KIP, as it may not be necessary.

When reading records, we will use the WriteBatchWithIndex#getFromBatchAndDB and WriteBatchWithIndex#newIteratorWithBase utilities in order to ensure that uncommitted writes are available to query. This minimizes the amount of custom code needed to implement transactionality, and allows RocksDB to perform optimizations.

...