...
Code Block | ||||
---|---|---|---|---|
| ||||
public interface StoreTypeSpec { KeyValueBytesStoreSupplier keyValueStore(final StringKeyValueSupplierParameters nameparams); WindowBytesStoreSupplier windowStore(final WindowSupplierParameters params); SessionBytesStoreSupplier sessionStore(final SessionSupplierParameters params); } // the below are all "struct"-like classes with the following fields class KeyValueSupplierParameters(String name); class WindowSupplierParameters(String name, final Duration retentionPeriod, final Duration windowSize, final boolean retainDuplicates, EmityStrategy emitStrategy); SessionBytesStoreSupplier sessionStore(final class SessionSupplierParameters(String name, final Duration retentionPeriod, EmityStrategy emitStrategy); } |
Info |
---|
Note on Evolving API: a concern raised on KIP-591 about having such an interface is that the increased API surface area would mean introducing new store implementations would cause custom state store implementations to throw compile time errors. Introducing the *Parameters classes will prevent such issues unless an entirely new store type is added. If an entirely new state store type (beyond KV/Windowed/Session) is added - area would mean introducing new store types (beyond KVS/Window/Session) would cause custom state store implementations to throw compiletime errors. I think it is valid to have new store types have a default implementation that throws |
OOTB Store Type Specs
We will provide default implementations of the above interface in the following classes:
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class StreamJoined { /** * Creates a StreamJoined instance with the given {@link StoreTypeSpec}. The store provider * will be used to get all the state stores in this operation that do not otherwise have an * explicitly configured {@link org.apache.kafka.streams.state.StoreSupplier}. * * @param storeTypeSpec the store type specification that will be used for all unconfigured state stores * @param <K> the key type * @param <V1> this value type * @param <V2> other value type * @return {@link StreamJoined} instance */ public static <K, V1, V2> StreamJoined<K, V1, V2> with(final StoreTypeSpec storeTypeSpec); /** * Configure with the provided {@link StoreTypeSpec} for all state stores that are not * configured with a {@link org.apache.kafka.streams.state.StoreSupplier} already. * * @param storeProvider the store provider to use for all unconfigured state stores * @return a new {@link StreamJoined} configured with this store provider */ public StreamJoined<K, V1, V2> withStoreProviderwithStoreTypeSpec(final StoreTypeSpec storeProvider); } |
...