Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public interface RichValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichValueJoiner<K, V1, V2, VR> {
    VR apply(final K key, final V1 value1, final V2 value2, final RecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichReducer<K, V> {
    V apply(final K key, final V value1, final V value2, final RecordContext recordContext);
}

public interface RichAggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final RecordContext recordContext);
}

 
public interface RichMerger<K, V> {
	V apply(final K aggKey, final V aggOne, final V aggTwo, final RecordContext recordContext);
}

 
 
public interface RichValueTransformer<K, V, VR> {
	void init(final ProcessorContext context);

	VR transform(final V value);

	@Deprecated
	VR punctuate(final K key, final long timestamp);

	void close();
}


 
public interface RichValueTransformerSupplier<K, V, VR> {

    RichValueTransformer<K, V, VR> get();
}


 

...

Code Block
languagejava
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate);
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate);
KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate,
                       final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
 

<VR> KTable<K, VR> mapValues(final RichValueMapper<? super K, ? super V, ? extends VR> mapper);
<VR> KTable<K, VR> mapValues(final RichValueMapper<? super K, ? super V, ? extends VR> mapper,
                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

<KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
 

<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);
<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
                                       final Serialized<KR, VR> serialized);
 

<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final RichValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final RichValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner,
                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
 

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final RichValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner,
                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
 

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner,
                                 final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

 

 

 

 

 

 

 

KGroupedStream

 

 

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V>RichReducer<K, V> reducer);


KTable<K, V> reduce(final RichReducer<K, RichReducer<V>V> reducer,
                    final String queryableStoreNameMaterialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);


<VR> KTable<K, V>VR> reduceaggregate(final RichReducer<V>Initializer<VR> reducerinitializer,
                     final  StateStoreSupplier<KeyValueStore> storeSupplier);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichAggregator<? RichReducer<V>super reducerK,
 ? super V, VR> aggregator,
                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final RichInitializer<K, VR> initializer,
    final Windows<W> windows,
                       final Aggregator<? super K, ? super V, VR> aggregator,
                    final String queryableStoreName);


<W extends Window> KTable<Windowed<K>, V> reduce(  final RichReducer<V>Materialized<K, reducerVR,
 KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final RichInitializer<K, VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
   final Windows<W> windows);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                 final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);



<VR> KTable<K, VR> aggregate(final RichInitializer<K, VR> initializer,
                     final Windows<W> windows,
      final Aggregator<? super K, ? super V, VR> aggregator);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super finalV, StateStoreSupplier<WindowStore>VR> storeSupplieraggregator);


KTable<Windowed<K><VR> KTable<K, V>VR> reduceaggregate(final RichReducer<V> reducerRichInitializer<K, VR> initializer,
                             final finalRichAggregator<? SessionWindowssuper sessionWindowsK,
 ? super V,                           final String queryableStoreName);


 
KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows);
 
KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows,
                              final StateStoreSupplier<SessionStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);
 
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);



<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
             VR> aggregator);

 

 

 

SessionWindowedKStream

 

There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces. 

 

Code Block
languagejava
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                final StateStoreSupplier<KeyValueStore> storeSupplier);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
            final RichAggregator<? super K, ? super V, T> aggregator,
                                     final RichAggregator<RichMerger<? super K, ? super VT> sessionMerger);
<VR> KTable<Windowed<K>, VR> aggregatoraggregate(final RichInitializer<VR> initializer,
                                       final RichAggregator<? super K, ? super V, VR> aggregator,
          final    Windows<W> windows,
                        final RichMerger<? super K, VR> sessionMerger,
                            final Serde<VR> aggValueSerde,
         final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized);



KTable<Windowed<K>, V> reduce(final RichReducer<K, V> reducer);
KTable<Windowed<K>, V> reduce(final RichReducer<K, V> reducer,
                              final Materialized<K, V, final String queryableStoreNameSessionStore<Bytes, byte[]>> materializedAs);

<W

 

 

 

 

TimeWindowedKStream

  

Code Block
languagejava
 
<VR> KTable<Windowed<K> extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
 aggregate(final Initializer<VR> initializer,
                                       final RichAggregator<? super K, ? super V, VR> aggregator);
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
           final RichAggregator<? super K, ? super V, VR> aggregator,
                    final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Windows<W> windows,
        final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);

KTable<Windowed<K>, V> reduce(final RichReducer<K, V> reducer);
KTable<Windowed<K>, V> reduce(final RichReducer<K, V> reducer,
                              final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);






 

 

 

 

KGroupedTable

  

Code Block
languagejava
 
KTable<K, V> reduce(final Reducer<V> adder final StateStoreSupplier<WindowStore> storeSupplier);


<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                    final RichReducer<K, V> subtractor,
               final  RichAggregator<? super K, ?final superMaterialized<K, V, T> aggregatorKeyValueStore<Bytes,
               byte[]>> materialized);
KTable<K, V> reduce(final RichReducer<K, V> adder,
                      final Merger<? super K, T> sessionMergerReducer<V> subtractor,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> reduce(final RichReducer<K, V> adder,
       final SessionWindows sessionWindows,
           final RichReducer<K,     V> subtractor,
                    final Serde<T>Materialized<K, aggValueSerdeV,
 KeyValueStore<Bytes,               byte[]>> materialized);



KTable<K, V> reduce(final Reducer<V> adder,
                    final finalRichReducer<K, StringV> queryableStoreNamesubtractor);

<T> KTable<Windowed<K>KTable<K, T>V> aggregatereduce(final RichReducer<K, Initializer<T>V> initializeradder,
                    final Reducer<V> subtractor);
KTable<K, V> reduce(final RichReducer<K, V> adder,
          final    RichAggregator<? super K, ? super V, T>final aggregatorRichReducer<K,
 V> subtractor);



<VR> KTable<K, VR> aggregate(final Initializer<VR>  initializer,
                             final Merger<Aggregator<? super K, T> sessionMerger,
    ? super V, VR> adder,
                             final RichAggregator<? SessionWindowssuper sessionWindowsK,
 ? super V, VR> subtractor,
                             final Materialized<K, VR, final Serde<T> aggValueSerdeKeyValueStore<Bytes, byte[]>> materialized);
 
<T><VR> KTable<Windowed<K>KTable<K, T>VR> aggregate(final Initializer<T>Initializer<VR> initializer,
                                     final RichAggregator<? super K, ? super V, T>VR> aggregatoradder,
                             final Aggregator<?       final Merger<super K, ? super KV, T>VR> sessionMergersubtractor,
                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR>  aggregate(final SessionWindowsInitializer<VR> sessionWindowsinitializer,
                             final RichAggregator<? super K, ? super V, VR> final Serde<T> aggValueSerdeadder,
                             final RichAggregator<? super K, ?    final StateStoreSupplier<SessionStore> storeSupplier);

 

  • KGroupedTable

 

 

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V> addersuper V, VR> subtractor,
                    final Reducer<V> subtractor,
       final Materialized<K, VR, KeyValueStore<Bytes,          final String queryableStoreName);byte[]>> materialized);




<VR> KTable<K, V>VR> reduceaggregate(final Reducer<V>Initializer<VR> adderinitializer,
                             final Aggregator<? RichReducer<V>super subtractorK,
 ? super V, VR> adder,
               final String queryableStoreName);


KTable<K, V> reduce(final RichReducer<V> adder,
        final RichAggregator<? super K, ? super V, VR> subtractor);
<VR> KTable<K, VR>  aggregate(final RichReducer<V>Initializer<VR> subtractorinitializer,
                    final String queryableStoreName);


 
KTable<K, V> reduce(final RichReducer<V> adder,
   final RichAggregator<? super K, ? super V, VR> adder,
         final Reducer<V> subtractor);


KTable<K, V> reduce(final Reducer<V> adder,
              final Aggregator<? super K, ? super finalV, RichReducer<V>VR> subtractor);


<VR> KTable<K, V>VR> reduceaggregate(final RichReducer<V>Initializer<VR> adderinitializer,
                     final   RichReducer<V> subtractor);


KTable<K, V> reduce(final RichReducer<V> adder,
final RichAggregator<? super K, ? super V, VR> adder,
            final Reducer<V> subtractor,
               final RichAggregator<? super K, ? super finalV, StateStoreSupplier<KeyValueStore>VR> storeSuppliersubtractor);



<VR> KTable<K, V>VR> reduceaggregate(final Reducer<V>Initializer<VR> adderinitializer,
                    final RichReducer<V> subtractor,
       final Aggregator<? super K, ? super V, VR> adder,
        final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> reduce(final RichReducer<V> adder,
               final RichAggregator<? super K, ? final RichReducer<V>super V, VR> subtractor,
                             final StateStoreSupplier<KeyValueStore>Serde<VR> storeSupplieraggValueSerde);


 
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<Aggregator<? super K, ? super V, VR> subtractor,
                             final StringSerde<VR> queryableStoreNameaggValueSerde);


<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor);


<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adderSerde<VR> aggValueSerde);



<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<Aggregator<? super K, ? super V, VR> subtractoradder,
                             final Serde<VR> aggValueSerde RichAggregator<? super K, ? super V, VR> subtractor,
                             final StringStateStoreSupplier<KeyValueStore> queryableStoreNamestoreSupplier);


<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<Aggregator<? super K, ? super V, VR> subtractor,
                             final Serde<VR>StateStoreSupplier<KeyValueStore> aggValueSerdestoreSupplier);


<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


 

 

 

 

 

 

Proposed changes

 

 

  • Make record context open to public

...

Thus, the record context is not available in ProcessorContext. We As a result, we make the following changes to make it "public"

...