Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Make StateStore#flush() no-op by default and expand on JavaDoc for StateStore#flush() and StateStore#commit()

...

Code Block
languagejava
firstline106
titleorg.apache.kafka.streams.processor.StateStore
linenumberstrue
   
    /**
     * Flush any cached data
     * 
     * @deprecated Use {@link #commit(Map)} org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()}
     *             instead.
       */
    @Deprecated
    default void flush() {
        commit(Collections.emptyMap());
    }

    /** {
        // no-op
    }

    /**
     * Commit all written records to this StateStore.
     * <p>
     * This method MUST NOT be called by users from {@link org.apache.kafka.streams.processor.api.Processor processors},
     * as doing so may violate the consistency guarantees provided by this store, and expected by Kafka Streams.
     * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() 
     * Commit all written records to this StateStoreProcessorContext#commit()} to request a Task commit.
     * <p>
     * When called, every write written since the last call to {@link #commit(Map)}, or since this store was {@link
     * #init(StateStoreContext, StateStore) opened} will be made available to readers using the {@link
     * org.apache.kafka.common.IsolationLevel#READ_COMMITTED READ_COMMITTED} {@link
     * org.apache.kafka.common.IsolationLevel IsolationLevel}.
       * <p>
     * If {@link #persistent()} returns {@code true}, after this method returns, all records written since the last call
     * to {@link #commit(Map)} are guaranteed to be persisted to disk, and available to read, even if this {@link
     * StateStore} is {@link #close() closed} and subsequently {@link #init(StateStoreContext, StateStore) re-opened}.
     * <p>
     * If {@link #managesOffsets()} <em>also</em> returns {@code true}, the given {@code changelogOffsets} will be
     * guaranteed to be persisted to disk along with the written records.
     * <p>
     * {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However,
     * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided
     * partitions <em>MUST</em> be persisted to disk.
     * <p>
     * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are comittedcommitted to disk atomically with the
     * records they represent.
     * 
     * @param changelogOffsets The changelog offset(s) corresponding to the most recently written records.
     */
    default void commit(final Map<TopicPartition, Long> changelogOffsets) {
        flush();
    }

    /**
     * Returns the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}.
     * <p>
     * If {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the
     * offset that corresponds to the changelog record most recently written to this store, for the given {@code
     * partition}.
     * <p>
     * This method provides readers using the {@link org.apache.kafka.common.IsolationLevel#READ_COMMITTED} {@link
     * org.apache.kafka.common.IsolationLevel} a means to determine the point in the changelog that this StateStore
     * currently represents.
     * 
     * @param partition The partition to get the committed offset for.
     * @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset
     *         has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()}
     *         return {@code false}.
     */
    default Long getCommittedOffset(final TopicPartition partition) {
        return null;
    }

    /**
     * Determines if this StateStore manages its own offsets.
     * <p>
     * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using
     * {@link #getCommittedOffset(TopicPartition)}, even if the store is {@link #close() closed} and later re-opened.
     * <p>
     * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link
     * #getCommittedOffset(TopicPartition)} will be expected to always return {@code null}.
     * <p>
     * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highly
     * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka Streams
     * expects when operating under the {@code exactly-once} {@code processing.mode}.
     * 
     * @return Whether this StateStore manages its own offsets.
     */
    default boolean managesOffsets() {
        return false;
    }

   /**
     * Return an approximate count of memory used by records not yet committed to this StateStore.
     * <p>
     * This method will return an approximation of the memory would be freed by the next call to {@link #commit(Map)}.
     * <p>
     * If this StateStore is unable to approximately count uncommitted memory usage, it will return {@code -1}.
     * If this StateStore does not support atomic transactions, it will return {@code 0}, because records will always be
     * immediately written to a non-transactional store, so there will be none awaiting a {@link #commit(Map)}.
     *
     * @return The approximate size of all records awaiting {@link #commit(Map)}, {@code -1} if the size of uncommitted
     *         records can't be counted, or {@code 0} if this StateStore does not support transactions.
     */
    @Evolving
    default long approximateNumUncommittedBytes() {
        return 0;
    }

...