THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** * Flush any cached data * * @deprecated Use {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() ProcessorContext#commit()} * instead. */ @Deprecated default void flush() { // no-op } /** * Commit all written records to this StateStore. * <p> * This method MUST NOT be called by users from {@link org.apache.kafka.streams.processor.api.Processor processors}, * as doing so may violate the consistency guarantees provided by this store, and expected by Kafka Streams. * Instead, users should call {@link org.apache.kafka.streams.processor.api.ProcessingContext#commit() * ProcessorContext#commit()} to request a Task commit. * <p> * When called, every write written since the last call to {@link #commit(Map)}, or since this store was {@link * #init(StateStoreContext, StateStore) opened} will be made available to readers using the {@link * org.apache.kafka.common.IsolationLevel#READ_COMMITTED READ_COMMITTED} {@link * org.apache.kafka.common.IsolationLevel IsolationLevel}. * <p> * If {@link #persistent()} returns {@code true}, after this method returns, all records written since the last call * to {@link #commit(Map)} are guaranteed to be persisted to disk, and available to read, even if this {@link * StateStore} is {@link #close() closed} and subsequently {@link #init(StateStoreContext, StateStore) re-opened}. * <p> * If {@link #managesOffsets()} <em>also</em> returns {@code true}, the given {@code changelogOffsets} will be * guaranteed to be persisted to disk along with the written records. * <p> * {@code changelogOffsets} will usually contain a single partition, in the case of a regular StateStore. However, * they may contain multiple partitions in the case of a Global StateStore with multiple partitions. All provided * partitions <em>MUST</em> be persisted to disk. * <p> * Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are committed to disk atomically with the * records they represent. * * @param changelogOffsets The changelog offset(s) corresponding to the most recently written records. */ default void commit(final Map<TopicPartition, Long> changelogOffsets) { flush(); } /** * Returns the most recently {@link #commit(Map) committed} offset for the given {@link TopicPartition}. * <p> * If {@link #managesOffsets()} and {@link #persistent()} both return {@code true}, this method will return the * offset that corresponds to the changelog record most recently written to this store, for the given {@code * partition}. * <p> * This method provides readers using the {@link org.apache.kafka.common.IsolationLevel#READ_COMMITTED} {@link * org.apache.kafka.common.IsolationLevel} a means to determine the point in the changelog that this StateStore * currently represents. * * @param partition The partition to get the committed offset for. * @return The last {@link #commit(Map) committed} offset for the {@code partition}; or {@code null} if no offset * has been committed for the partition, or if either {@link #persistent()} or {@link #managesOffsets()} * return {@code false}. */ default Long getCommittedOffsetcommittedOffset(final TopicPartition partition) { return null; } /** * Determines if this StateStore manages its own offsets. * <p> * If this method returns {@code true}, then offsets provided to {@link #commit(Map)} will be retrievable using * {@link #getCommittedOffset#committedOffset(TopicPartition)}, even if the store is {@link #close() closed} and later re-opened. * <p> * If this method returns {@code false}, offsets provided to {@link #commit(Map)} will be ignored, and {@link * #getCommittedOffset#committedOffset(TopicPartition)} will be expected to always return {@code null}. * <p> * This method is provided to enable custom StateStores to opt-in to managing their own offsets. This is highly * recommended, if possible, to ensure that custom StateStores provide the consistency guarantees that Kafka Streams * expects when operating under the {@code exactly-once} {@code processing.mode}. * * @return Whether this StateStore manages its own offsets. */ default boolean managesOffsets() { return false; } /** * Return an approximate count of memory used by records not yet committed to this StateStore. * <p> * This method will return an approximation of the memory that would be freed by the next call to {@link * #commit(Map)}. * <p> * If no records have been written to this store since {@link #init(StateStoreContext, StateStore) opening}, or * since the last {@link #commit(Map)}; or if this store does not support atomic transactions, it will return {@code * 0}, as no records are currently being buffered. * * @return The approximate size of all records awaiting {@link #commit(Map)}; or {@code 0} if this store does not * support transactions, or has not been written to since {@link #init(StateStoreContext, StateStore)} or * last {@link #commit(Map)}. */ @Evolving default long approximateNumUncommittedBytes() { return 0; } |
...
The existing .checkpoint
files will be retained for any StateStore
that does not set managesOffsets()
to true
, and to ensure managed offsets are available when the store is closed. Existing offsets will be automatically migrated into StateStores
that manage their own offsets, iff there is no offset returned by StateStore#getCommittedOffset
StateStore#committedOffset
.
Required interface changes:
- Add methods
void commit(Map<TopicPartition, Long> changelogOffsets)
,boolean managesOffsets()
andLong getCommittedOffsetcommittedOffset(TopicPartition)
toStateStore
. - Deprecate method
flush()
onStateStore.
...
- During StateStore initialization, in order to synchronize the offsets in
.checkpoint
with the offsets returned byStateStore#getCommittedOffsetStateStore#committedOffset(TopicPartition)
, which are the source of truth for stores that manage their own offsets. - When the StateStore is closed, in order to ensure that the offsets used for Task assignment reflect the state persisted to disk.
- At the end of every Task commit, if-and-only-if at least one StateStore in the Task is persistent and does not manage its own offsets. This ensures that stores that don't manage their offsets continue to have their offsets persisted to disk whenever the StateStore data itself is committed.
- Avoiding writing
.checkpoint
when every persistent store manages its own offsets ensures we don't pay a significant performance penalty when the commit interval is short, as it is by default under EOS. - Since all persistent StateStores provided by Kafka Streams will manage their own offsets, the common case is that the
.checkpoint
file will not be updated oncommit(Map)
- Avoiding writing
...