Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In order for state stores to provide stronger consistency in the future across replicated state stores (e.g., RYW Read-Your-Writes consistency) they need to be able to collect record it's evident for state stores to collect metadata (e.g., offset information) of all the records that have been written to the store.

Today, we already make record metadata available in the ProcessContext (::recordMetadata()), and it is present in the abstract class that implements both ProcessorContext and StateStoreContext, but the method is not currently exposed through the StateStoreContext interface, so it is not available in the state stores.State stores access processor context via the StateStoreContext interface. Today, the AbstractProcessorContext which implements/extends the StateStore interface already makes record metadata available via recordMetadata(). Unfortunately that method is not part of the StateStoreContext interface, thus, state stores cannot access it. 

Public Interfaces

In this KIP we propose to expose add recordMetada() via the StateStoreContext. The following interface will change:

...

Code Block
public interface StateStoreContext {
    ...

    /**
     * Return the metadata of the current topic/partition/offset if available.
     * This metadata in a StateStore is defined as the metadata of the last record
     * that is currently been processed by the StreamTask that holds the store.
     * <p>
     * Note that the metadata is not defined during all store interactions, for
     * example, while the StreamTask is running a punctuation.
     */
      Optional<RecordMetadata> recordMetadata();

    ...
}

...