...
Current state: "Under Discussion"
Discussion thread: TBDhere [Change the link from the KIP proposal email archive to your own email thread]
JIRA: TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
The recommended practice to create a StateStoreSupplier is as in per the following example:
Code Block |
---|
StateStoreSupplier countStore = Stores.create("Counts") .withKeys(Serdes.String()) .withValues(Serdes.Long()) .persistent() .build(); |
However, StateStoreSupplier is a generic interface that take takes the store StateStore type as a parameter:
Code Block |
---|
public interface StateStoreSupplier<T extends StateStore> |
...
As StateStoreSupplier is passed to count/reduce/aggregate etc. methods on KGroupedStream or KGroupedTable, the compiler cannot detect that if a supplier for the wrong kind of store is provided.
...
The StateStoreSupplier argument stands out as a raw type. Making it type-parameterised will help detect prior to runtime at compile time errors such as when someone refactors their app to use a different typo of aggregations (e.g. TimeWindowed vs SessionWindowed) and forgets to change the StateStoreSupplier passed in.
...
KGroupedStream
KTable<K, Long> count(final StateStoreSupplier<KeyValueStore<K, Long>> storeSupplier);
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
final StateStoreSupplier<WindowStore<K, Long>> storeSupplier);KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
final StateStoreSupplier<SessionStore<K, Long>> storeSupplier);KTable<K, V> reduce(final Reducer<V> reducer,
final StateStoreSupplier<KeyValueStore<K ,V>> storeSupplier);<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Windows<W> windows,
final StateStoreSupplier<WindowStore<K, V>> storeSupplier);KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final SessionWindows sessionWindows,final StateStoreSupplier<SessionStore<K, V>> storeSupplier);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,final StateStoreSupplier<KeyValueStore<K, VR>> storeSupplier);
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Windows<W> windows,
final StateStoreSupplier<WindowStore<K, VR>> storeSupplier);<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
final Aggregator<? super K, ? super V, T> aggregator,
final Merger<? super K, T> sessionMerger,
final SessionWindows sessionWindows,
final Serde<T> aggValueSerde,
final StateStoreSupplier<SessionStore<K, T>> storeSupplier);
- KGroupedTable
KTable<K, Long> count(final StateStoreSupplier<KeyValueStore<K, Long>> storeSupplier);
KTable<K, V> reduce(final Reducer<V> adder,
final Reducer<V> subtractor,
final StateStoreSupplier<KeyValueStore<K,V>> storeSupplier);<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
final StateStoreSupplier<KeyValueStore<K,VR>> storeSupplier);
- Stores
- 2 new public interfaces:
Code Block public interface PersistentWindowFactory<K, V> { /** * Caching should be enabled on the created store. * @return the factory to create a persistent key-valuewindow store */ PersistentWindowFactory<K, V> enableCaching(); /** * Indicates that a changelog should not be created for the key-value store * @return the factory to create a persistent key-valuewindow store */ PersistentWindowFactory<K, V> disableLogging(); /** * Indicates that a changelog should be created for the store. The changelog will be created * with the provided cleanupPolicy and configs. * * Note: Any unrecognized configs will be ignored. * @param config any configs that should be applied to the changelog * @return the factory to create a persistent key-valuewindow store */ PersistentWindowFactory<K, V> enableLogging(final Map<String, String> config); /** * Return the instance of StateStoreSupplier of new key-valuewindow store. * @return the key-value store; never null */ StateStoreSupplier<WindowStore<K, V>> build(); }
Code Block public interface PersistentSessionFactory<K, V> { /** * Indicates that a changelog should be created for the store. The changelog will be created * with the provided cleanupPolicy and configs. * * Note: Any unrecognized configs will be ignored. * @param config any configs that should be applied to the changelog * @return the factory to create a persistent key-value store */ PersistentSessionFactory<K, V> enableLogging(final Map<String, String> config); /** * Indicates that a changelog should not be created for the key-value store * @return the factory to create a persistent key-valuesession store */ PersistentSessionFactory<K, V> disableLogging(); /** * Caching should be enabled on the created store. * @return the factory to create a persistent key-valuesession store */ PersistentSessionFactory<K, V> enableCaching(); /** * Return the instance of StateStoreSupplier of new key-valuesession store. * @return the key-value store; never null */ StateStoreSupplier<SessionStore<K, V>> build(); }
- changes to PersistentKeyValueFactory :
PersistentWindowFactory<K, V> windowed(final long windowSize, long retentionPeriod, int numSegments, boolean retainDuplicates);
PersistentSessionFactory<K, V> sessionWindowed(final long retentionPeriod);
StateStoreSupplier<KeyValueStore<K, V>> build();
- changes to InMemoryKeyValueFactory:
StateStoreSupplier<KeyValueStore<K, V>> build();
- 2 new public interfaces:
...