Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Add legacy checkpointing for custom StateStores that don't know how to manage their own offsets.

...

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
    /**
     * Flush any cached data
     *
     * @deprecated since KIP-892. Use {@link #commit(Map)} instead.
     */
    @Deprecated
    default void flush() {}

    /**
     * Commit the current transaction to this StateStore with the given changelog offset.
     * <p>
     * This is a convenience method for {@link #commit(Map) commit(Collections.singletonMap(null, changelogOffset))}.
     * <p>
     * This method is used to commit records to a regular, logged StateStore.
     *
     * @see #commit(Map)
     * @param changelogOffset The offset of the changelog topic this commit corresponds to. The offset can be
     *                        {@code null} if this StateStore does not have a changelog.
     */
    @Evolving
    default void commit(final Long changelogOffset) {
        commit(Collections.singletonMap(null, changelogOffset));
    }

    /**
     * Commit the current transaction to this StateStore with the given offsets.
     * <p>
     * All records that were written to this store since the last {@code commit} will be written to the store
     * atomically, i.e. either all will be written, or none of them will.
     * <p>
     * After this method returns, calls to {@link #getCommittedOffset(TopicPartition)} will return the associated value
     * of {@code offsets}.
     * <p>
     * {@code offsets} will contain one of either:
     * <ul>
     *     <li>A {@code Map} of offsets for each input partition of this Global StateStore.</li>
     *     <li>A {@code Map} containing one mapping from {@code null} to the offset of the changelog partition for this
     *     logged StateStore.</li>
     *     <li>A {@code null}, if this StateStore is not {@link StoreBuilder#withLoggingDisabled() logged}.</li>
     * </ul>
     * <p>
     * If this store is {@link #persistent() persistent}, it is required that on-restart, the offsets returned by {@link
     * #getCommittedOffset(TopicPartition)} correspond with the records persisted in the StateStore.
     * 
     * @param offsets The offset(s) for the input topics this commit corresponds to. May be {@code null} if this
     *                StateStore has no input topics or changelog.
     */
    @Evolving
    default void commit(final Map<TopicPartition, Long> offsets) {
        flush();
    }

    /**
     * Returns the latest committed changelog offset that the materialized state in this store represents.
     * <p>
     * The state in the store is guaranteed to reflect exactly the state in the changelog up to this offset.
     * <p>
     * This may be {@code null}, if the store does not contain any metadata on its changelog offset. In this case, the
     * StateStore itself makes no guarantees about its contents.
     * <p>
     * Equivalent to calling {@code getCommittedOffset(null)}.
     * <p>
     * This method should <em>not</em> be called for global StateStores, as they are not backed by a changelog.
     *
     * @return The latest committed changelog offset that the state in this store represents, or {@code null}, if no
     *         changelog offset metadata is available and no guarantees can be made.
     */
    @Evolving
    default Long getCommittedOffset() {
        return getCommittedOffset(null);
    }

    /**
     * Returns the latest committed changelog offset that the materialized state in this store represents.
     * <p>
     * The state in the store is guaranteed to reflect exactly the state in the changelog up to this offset.
     * <p>
     * This may be {@code null}, if the store does not contain any metadata on its changelog offset. In this case, the
     * StateStore itself makes no guarantees about its contents.
     * <p>
     * If {@code topicPartition} is {@code null}, the returned offset will be the offset for the changelog partition of
     * this StateStore, if one exists.
     *
     * @return The latest committed offset that the state in this store represents, or {@code null}, if no
     *         changelog offset metadata is available and no guarantees can be made.
     */
    @Evolving
    default Long getCommittedOffset(final TopicPartition topicPartition) {
        return null;
    }


    /**
     * Return an approximate count of records not yet committed to this StateStore.
     * <p>
     * This method will return an approximation of the number of records that would be committed by the next call to
     * {@link #commit(Map)}.
     * <p>
     * If this StateStore is unable to approximately count uncommitted records, it will return {@code -1}.
     * If this StateStore does not support atomic transactions, it will return {@code 0}, because records will always be
     * immediately written to a non-transactional store, so there will be none awaiting a {@link #commit(Map)}.
     *
     * @return The approximate number of records awaiting {@link #commit(Map)}, {@code -1} if the number of
     *         uncommitted records can't be counted, or {@code 0} if this StateStore does not support transactions.
     */
    @Evolving
    default long approximateNumUncommittedEntries() {
        return 0;
    }

   /**
     * Return an approximate count of memory used by records not yet committed to this StateStore.
     * <p>
     * This method will return an approximation of the memory would be freed by the next call to {@link #commit(Map)}.
     * <p>
     * If this StateStore is unable to approximately count uncommitted memory usage, it will return {@code -1}.
     * If this StateStore does not support atomic transactions, it will return {@code 0}, because records will always be
     * immediately written to a non-transactional store, so there will be none awaiting a {@link #commit(Map)}.
     *
     * @return The approximate size of all records awaiting {@link #commit(Map)}, {@code -1} if the size of uncommitted
     *         records can't be counted, or {@code 0} if this StateStore does not support transactions.
     */
    @Evolving
    default long approximateNumUncommittedBytes() {
        return 0;
    }

    /**
     * Return if this implementation manages its own checkpointing during {@link #commit(Map)}.
     * <p>
     * If this method returns {@code true}, and {@link #persistent()} returns {@code true}, then {@link
     * #getCommittedOffset(TopicPartition)} MUST return the offsets provided to the most recent call to {@link
     * #commit(Map)}, even if the application has been restarted since the last {@link #commit(Map)}.
     * <p>
     * If this method returns {@code false}, this StateStore may be wrapped in a default checkpoint implementation
     * by the Kafka Streams engine.
     */
    @Evolving
    default boolean managesCheckpoints() {
        return false;
    }

Proposed Changes

There are two parts to this KIP:

...

Custom StateStore implementations will continue to operate, however, since Task checkpoints are no longer written, they will be expected to handle their own offsets. If they do not, they will be considered corrupt and wiped. It is up to users to upgrade custom implementations to ensure that this does not happenthat have not been updated to manage their own offsets will be automatically adapted (via CheckpointingStateStoreAdapter) to use the legacy (per-Task checkpoint file) checkpoint implementation. This is not expected to be optimal, but will ensure consistency for these stores. It is recommended that custom StateStore implementations be upgraded to manage their own offsets using the most optimal approach available to them.

Note: custom implementations that extend an internal implementation, like RocksDBStore, will automatically assume the checkpointing behaviour of that implementation, and should automatically function as expected.

...