Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleDSLStoreProvider.java
public interface StoreTypeSpec {

    KeyValueBytesStoreSupplier keyValueStore(final StringKeyValueSupplierParameters nameparams);

    WindowBytesStoreSupplier windowStore(final WindowSupplierParameters params);

    SessionBytesStoreSupplier sessionStore(final SessionSupplierParameters params);
}

// the below are all "struct"-like classes with the following fields
class KeyValueSupplierParameters(String name);
class WindowSupplierParameters(String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates, EmityStrategy emitStrategy);

    SessionBytesStoreSupplier sessionStore(final 
class SessionSupplierParameters(String name, final Duration retentionPeriod, EmityStrategy emitStrategy);
}  


Info

Note on Evolving API: a concern raised on KIP-591 about having such an interface is that the increased API surface area would mean introducing new store implementations would cause custom state store implementations to throw compile time errors. Introducing the *Parameters classes will prevent such issues unless an entirely new store type is added.

If an entirely new state store type (beyond KV/Windowed/Session) is added - area would mean introducing new store types (beyond KVS/Window/Session) would cause custom state store implementations to throw compiletime errors. I think it is valid to have new store types have a default implementation that throws new UnsupportedOperationException() as it is unlikely that users that specify a custom state store as the default will want a different (e.g. ROCKS_DB) store created without them knowing. This also seems unlikely since these three have been there for many years and they've been the only three for that duration.

OOTB Store Type Specs

We will provide default implementations of the above interface in the following classes:

...

Code Block
languagejava
titleStreamJoined.java
public class StreamJoined {

    /**
     * Creates a StreamJoined instance with the given {@link StoreTypeSpec}. The store provider
     * will be used to get all the state stores in this operation that do not otherwise have an
     * explicitly configured {@link org.apache.kafka.streams.state.StoreSupplier}.
     *
     * @param storeTypeSpec       the store type specification that will be used for all unconfigured state stores
     * @param <K>                 the key type
     * @param <V1>                this value type
     * @param <V2>                other value type
     * @return                    {@link StreamJoined} instance
     */
    public static <K, V1, V2> StreamJoined<K, V1, V2> with(final StoreTypeSpec storeTypeSpec);
    
    /**
     * Configure with the provided {@link StoreTypeSpec} for all state stores that are not
     * configured with a {@link org.apache.kafka.streams.state.StoreSupplier} already.
     * 
     * @param storeProvider  the store provider to use for all unconfigured state stores
     * @return            a new {@link StreamJoined} configured with this store provider
     */
    public StreamJoined<K, V1, V2> withStoreProviderwithStoreTypeSpec(final StoreTypeSpec storeProvider);
}  

...