Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Rename getCommittedOffset -> committedOffset

...

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
   
    /**
     * Flush any cached data
     * 
     * @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()}
     *             instead.
     */
    @Deprecated
    default void flush() {
        // no-op
    }

    /**
     * Commit all written records to this StateStore.
     * <p>
     * This method MUST NOT be called by users from {@link org.apache.kafka.streams.processor.api.Processor processors},
     * as doing so may violate the consistency guarantees provided by this store, and expected by Kafka Streams.
     * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() 
     * ProcessorContext#commit()} to request a Task commit.
     * <p>
     * When called, every write written since the last call to {@link #commit(Map)}, or since this store was {@link
     * #init(StateStoreContext, StateStore) opened} will be made available to readers using the {@link
     * org.apache.kafka.common.IsolationLevel#READ_COMMITTED READ_COMMITTED} {@link
     * org.apache.kafka.common.IsolationLevel IsolationLevel}.
     * <p>
     * If {@link #persistent()} returns {@code true}, after this method returns, all records written since the last call
     * to {@link #commit(Map)} are guaranteed to be persisted to disk, and available to read, even if this {@link
     * StateStore} is {@link #close() closed} and subsequently {@link #init(StateStoreContext, StateStore) re-opened}.
     * <p>
     * If {@link #managesOffsets()} <em>also</em> returns {@code true}, the given {@code changelogOffsets} will be
     * guaranteed to be persisted to disk along with the written records.
     * <p>
     * {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However,
     * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided
     * partitions <em>MUST</em> be persisted to disk.
     * <p>
     * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the
     * records they represent.
     * 
     * @param changelogOffsets The changelog offset(s) corresponding to the most recently written records.
     */
    default void commit(final Map<TopicPartition, Long> changelogOffsets) {
        flush();
    }

    /**
     * Returns the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}.
     * <p>
     * If {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the
     * offset that corresponds to the changelog record most recently written to this store, for the given {@code
     * partition}.
     * <p>
     * This method provides readers using the {@link org.apache.kafka.common.IsolationLevel#READ_COMMITTED} {@link
     * org.apache.kafka.common.IsolationLevel} a means to determine the point in the changelog that this StateStore
     * currently represents.
     * 
     * @param partition The partition to get the committed offset for.
     * @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
     *         has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
     *         return {@code false}.
     */
    default Long getCommittedOffsetcommittedOffset(final TopicPartition partition) {
        return null;
    }

    /**
     * Determines if this StateStore manages its own offsets.
     * <p>
     * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using
     * {@link #getCommittedOffset#committedOffset(TopicPartition)}, even if the store is {@link #close() closed} and later re-opened.
     * <p>
     * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link
     * #getCommittedOffset#committedOffset(TopicPartition)} will be expected to always return {@code null}.
     * <p>
     * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highly
     * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka Streams
     * expects when operating under the {@code exactly-once} {@code processing.mode}.
     * 
     * @return Whether this StateStore manages its own offsets.
     */
    default boolean managesOffsets() {
        return false;
    }

   /**      
     * Return an approximate count of memory used by records not yet committed to this StateStore.
     * <p>
     * This method will return an approximation of the memory that would be freed by the next call to {@link
     * #commit(Map)}.
     * <p>
     * If no records have been written to this store since {@link #init(StateStoreContext, StateStore) opening}, or
     * since the last {@link #commit(Map)}; or if this store does not support atomic transactions, it will return {@code
     * 0}, as no records are currently being buffered.
     *
     * @return The approximate size of all records awaiting {@link #commit(Map)}; or {@code 0} if this store does not
     *         support transactions, or has not been written to since {@link #init(StateStoreContext, StateStore)} or
     *         last {@link #commit(Map)}.
     */
    @Evolving
    default long approximateNumUncommittedBytes() {
        return 0;
    }

...

The existing .checkpoint files will be retained for any StateStore that does not set managesOffsets()  to true , and to ensure managed offsets are available when the store is closed. Existing offsets will be automatically migrated into StateStores that manage their own offsets, iff there is no offset returned by StateStore#getCommittedOffset StateStore#committedOffset.

Required interface changes:

  • Add methods void commit(Map<TopicPartition, Long> changelogOffsets), boolean managesOffsets() and Long getCommittedOffsetcommittedOffset(TopicPartition) to StateStore .
  • Deprecate method flush() on StateStore.

...

  1. During StateStore initialization, in order to synchronize the offsets in .checkpoint  with the offsets returned by StateStore#getCommittedOffsetStateStore#committedOffset(TopicPartition), which are the source of truth for stores that manage their own offsets.
  2. When the StateStore is closed, in order to ensure that the offsets used for Task assignment reflect the state persisted to disk.
  3. At the end of every Task commit, if-and-only-if at least one StateStore in the Task is persistent and does not manage its own offsets. This ensures that stores that don't manage their offsets continue to have their offsets persisted to disk whenever the StateStore data itself is committed.
    • Avoiding writing .checkpoint when every persistent store manages its own offsets ensures we don't pay a significant performance penalty when the commit interval is short, as it is by default under EOS.
    • Since all persistent StateStores provided by Kafka Streams will manage their own offsets, the common case is that the .checkpoint file will not be updated on commit(Map) 

...