Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
    /**
     * Flush any cached data
     *
     * @deprecated since KIP-NEXT892. Use {@link #commit(Map)} instead.
     */
    @Deprecated
    default void flush() {}

    /**
     * Commit the current transaction to this StateStore with the given changelog offset.
     * <p>
     * This is a convenience method for {@link #commit(Map) commit(Collections.singletonMap(null, changelogOffset))}.
     * <p>
     * This method is used to commit records to a regular, logged StateStore.
     *
     * @see #commit(Map)
     * @param changelogOffset The offset of the changelog topic this commit corresponds to. The offset can be
     *                        {@code null} if this StateStore does not have a changelog.
     */
    @Evolving
    default void commit(final Long changelogOffset) {
        commit(Collections.singletonMap(null, changelogOffset));
    }

    /**
     * Commit the current transaction to this StateStore with the given offsets.
     * <p>
     * All records that were written to this store since the last {@code commit} will be written to the store
     * atomically, i.e. either all will be written, or none of them will.
     * <p>
     * After this method returns, calls to {@link #getCommittedOffset(TopicPartition)} will return the associated value
     * of {@code offsets}.
     * <p>
     * {@code offsets} will contain one of either:
     * <ul>
     *     <li>A {@code Map} of offsets for each input partition of this Global StateStore.</li>
     *     <li>A {@code Map} containing one mapping from {@code null} to the offset of the changelog partition for this
     *     logged StateStore.</li>
     *     <li>A {@code null}, if this StateStore is not {@link StoreBuilder#withLoggingDisabled() logged}.</li>
     * </ul>
     * <p>
     * If this store is {@link #persistent() persistent}, it is required that on-restart, the offsets returned by {@link
     * #getCommittedOffset(TopicPartition)} correspond with the records persisted in the StateStore.
     * 
     * @param offsets The offset(s) for the input topics this commit corresponds to. May be {@code null} if this
     *                StateStore has no input topics or changelog.
     */
    @Evolving
    default void commit(final Map<TopicPartition, Long> offsets) {
        flush();
    }

    /**
     * Returns the latest committed changelog offset that the materialized state in this store represents.
     * <p>
     * The state in the store is guaranteed to reflect exactly the state in the changelog up to this offset.
     * <p>
     * This may be {@code null}, if the store does not contain any metadata on its changelog offset. In this case, the
     * StateStore itself makes no guarantees about its contents.
     * <p>
     * Equivalent to calling {@code getCommittedOffset(null)}.
     * <p>
     * This method should <em>not</em> be called for global StateStores, as they are not backed by a changelog.
     *
     * @return The latest committed changelog offset that the state in this store represents, or {@code null}, if no
     *         changelog offset metadata is available and no guarantees can be made.
     */
    @Evolving
    default Long getCommittedOffset() {
        return getCommittedOffset(null);
    }

    /**
     * Returns the latest committed changelog offset that the materialized state in this store represents.
     * <p>
     * The state in the store is guaranteed to reflect exactly the state in the changelog up to this offset.
     * <p>
     * This may be {@code null}, if the store does not contain any metadata on its changelog offset. In this case, the
     * StateStore itself makes no guarantees about its contents.
     * <p>
     * If {@code topicPartition} is {@code null}, the returned offset will be the offset for the changelog partition of
     * this StateStore, if one exists.
     *
     * @return The latest committed offset that the state in this store represents, or {@code null}, if no
     *         changelog offset metadata is available and no guarantees can be made.
     */
    @Evolving
    default Long getCommittedOffset(final TopicPartition topicPartition) {
        return null;
    }

...