Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

  • KGroupedStream -deprecate the following methods:

    • KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);

    • <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
                                                                                                     final StateStoreSupplier<WindowStore> storeSupplier);

    • KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
                                                                  final StateStoreSupplier<SessionStore> storeSupplier);

    • KTable<K, V> reduce(final Reducer<V> reducer,
                                         final StateStoreSupplier<KeyValueStore> storeSupplier);

    • <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                                                                  final Windows<W> windows,
                                                                                                  final StateStoreSupplier<WindowStore> storeSupplier);

    • KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                              final SessionWindows sessionWindows,

                                                              final StateStoreSupplier<SessionStore> storeSupplier);

    • <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                                           final Aggregator<? super K, ? super V, VR> aggregator,

                                                           final StateStoreSupplier<KeyValueStore> storeSupplier);

    • <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                                                                                 final Aggregator<? super K, ? super V, VR> aggregator,
                                                                                                                 final Windows<W> windows,
                                                                                                                 final StateStoreSupplier<WindowStore> storeSupplier);

    • <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,

                                                                          final Aggregator<? super K, ? super V, T> aggregator,

                                                                          final Merger<? super K, T> sessionMerger,

                                                                          final SessionWindows sessionWindows,

                                                                          final Serde<T> aggValueSerde,

                                                                          final StateStoreSupplier<SessionStore> storeSupplier);

  • KGroupedStream - add the following replacement methods:

    • KTable<K, Long> count(final TypedStateStoreSupplier<KeyValueStore<K, Long>> storeSupplier);

    • <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
                                                                                                     final TypedStateStoreSupplier<WindowStore<K, Long>> storeSupplier);

    • KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows,
                                                                  final TypedStateStoreSupplier<SessionStore<K, Long>> storeSupplier);

    • KTable<K, V> reduce(final Reducer<V> reducer,
                                         final TypedStateStoreSupplier<KeyValueStore<K ,V>> storeSupplier);

    • <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                                                                  final Windows<W> windows,
                                                                                                  final TypedStateStoreSupplier<WindowStore<K, V>> storeSupplier);

    • KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                              final SessionWindows sessionWindows,

                                                              final TypedStateStoreSupplier<SessionStore<K, V>> storeSupplier);

    • <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                                           final Aggregator<? super K, ? super V, VR> aggregator,

                                                           final TypedStateStoreSupplier<KeyValueStore<K, VR>> storeSupplier);

    • <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                                                                                 final Aggregator<? super K, ? super V, VR> aggregator,
                                                                                                                 final Windows<W> windows,
                                                                                                                 final TypedStateStoreSupplier<WindowStore<K, VR>> storeSupplier);

    • <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,

                                                                          final Aggregator<? super K, ? super V, T> aggregator,

                                                                          final Merger<? super K, T> sessionMerger,

                                                                          final SessionWindows sessionWindows,

                                                                          final Serde<T> aggValueSerde,

                                                                          final TypedStateStoreSupplier<SessionStore<K, T>> storeSupplier);

  • KGroupedTable - deprecate the following methods:
    • KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);

    • KTable<K, V> reduce(final Reducer<V> adder,
                                         final Reducer<V> subtractor,
                                         final StateStoreSupplier<KeyValueStore> storeSupplier);

    • <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                                           final Aggregator<? super K, ? super V, VR> adder,
                                                           final Aggregator<? super K, ? super V, VR> subtractor,
                                                           final StateStoreSupplier<KeyValueStore> storeSupplier);

  • KGroupedTable - add the following replacement methods:KGroupedTable
    • KTable<K, Long> count(final TypedStateStoreSupplier<KeyValueStore<K, Long>> storeSupplier); 

    • KTable<K, V> reduce(final Reducer<V> adder,
                                         final Reducer<V> subtractor,
                                         final TypedStateStoreSupplier<KeyValueStore<K, V>> storeSupplier);

       

    • <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                                           final Aggregator<? super K, ? super V, VR> adder,
                                                           final Aggregator<? super K, ? super V, VR> subtractor,
                                                           final TypedStateStoreSupplier<KeyValueStore<K, VR>> storeSupplier);

  • Stores
    • 2 new public interfaces:
      • Code Block
        public interface PersistentWindowFactory<K, V> {
        
        
        /**
        * Caching should be enabled on the created store.
        * @return the factory to create a persistent window store
        */
        PersistentWindowFactory<K, V> enableCaching();
        
        /**
        * Indicates that a changelog should not be created for the key-value store
        * @return the factory to create a persistent window store
        */
        PersistentWindowFactory<K, V> disableLogging();
        
        /**
        * Indicates that a changelog should be created for the store. The changelog will be created
        * with the provided cleanupPolicy and configs.
        *
        * Note: Any unrecognized configs will be ignored.
        * @param config any configs that should be applied to the changelog
        * @return the factory to create a persistent window store
        */
        PersistentWindowFactory<K, V> enableLogging(final Map<String, String> config);
        
        
        /**
        * Return the instance of StateStoreSupplier of new window store.
        * @return the key-value store; never null
        */
        StateStoreSupplier<WindowStore<K, V>> build();
        }
      • Code Block
        public interface PersistentSessionFactory<K, V> {
        /**
        * Indicates that a changelog should be created for the store. The changelog will be created
        * with the provided cleanupPolicy and configs.
        *
        * Note: Any unrecognized configs will be ignored.
        * @param config any configs that should be applied to the changelog
        * @return the factory to create a persistent key-value store
        */
        PersistentSessionFactory<K, V> enableLogging(final Map<String, String> config);
        /**
        * Indicates that a changelog should not be created for the key-value store
        * @return the factory to create a persistent session store
        */
        PersistentSessionFactory<K, V> disableLogging();
        /**
        * Caching should be enabled on the created store.
        * @return the factory to create a persistent session store
        */
        PersistentSessionFactory<K, V> enableCaching();
        /**
        * Return the instance of StateStoreSupplier of new session store.
        * @return the key-value store; never null
        */
        StateStoreSupplier<SessionStore<K, V>> build();
         
        }
      • changes to PersistentKeyValueFactory :
        • PersistentWindowFactory<K, V> windowed(final long windowSize, long retentionPeriod, int numSegments, boolean retainDuplicates);

        • PersistentSessionFactory<K, V> sessionWindowed(final long retentionPeriod);

        • StateStoreSupplier<KeyValueStore<K, V>> build();

      • changes to InMemoryKeyValueFactory:
        • StateStoreSupplier<KeyValueStore<K, V>> build();

...