Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion thread: here

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params), close() methods are needed.

Public Interfaces

 

  • KStream.java

 

Code Block
languagejava
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);


KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);


<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);


<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);


<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> mapper);


<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);


<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor);


void foreach(final RichForeachAction<? super K, ? super V> action);


KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action);


KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates);


<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector);


<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector,
                                   final Serde<KR> keySerde,
                                   final Serde<V> valSerde);


<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows);


<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows,
                             final Serde<K> keySerde,
                             final Serde<V> thisValueSerde,
                             final Serde<VO> otherValueSerde);


<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows);


<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows,
                                 final Serde<K> keySerde,
                                 final Serde<V> thisValSerde,
                                 final Serde<VO> otherValueSerde);


<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows);


<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows,
                                  final Serde<K> keySerde,
                                  final Serde<V> thisValueSerde,
                                  final Serde<VO> otherValueSerde);


<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner);


<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                             final Serde<K> keySerde,
                             final Serde<V> valSerde);


<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner);


<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                 final Serde<K> keySerde,
                                 final Serde<V> valSerde);


<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
                                 final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                 final RichValueJoiner<? super V, ? super GV, ? extends RV> joiner);


<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                                     final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                     final RichValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner);

 

  • KTable.java

 

 

 

 

Code Block
languagejava
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate);


KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName);


KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate);


KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper);


<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper,
                             final Serde<VR> valueSerde,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


void foreach(final RichForeachAction<? super K, ? super V> action);


<KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);


<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);


<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
                                       final Serde<KR> keySerde,
                                       final Serde<VR> valueSerde);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final Serde<VR> joinSerde,
                            final String queryableStoreName);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final StateStoreSupplier<KeyValueStore> storeSupplier);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final Serde<VR> joinSerde,
                                final String queryableStoreName);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final StateStoreSupplier<KeyValueStore> storeSupplier);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final Serde<VR> joinSerde,
                                 final String queryableStoreName);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final StateStoreSupplier<KeyValueStore> storeSupplier);

 

 

  • KGroupedStream

 

 

 

 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4125
           
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4218
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4726
 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP combines KIP-149 and provides a hybrid solution to rich functions in Streams and accessing read-only keys within ValueJoiner, ValueTransformer, ValueMapper interfaces. 

 

Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods. For example:

  • having access to RecordContext within an operator
  • having access to a read-only key for ValueJoiner, ValueTransformer, ValueMapper interfaces



Rich Interfaces

 

Code Block
languagejava
 
public interface RichInitializer<V, K> {
    V apply(K key);
}

public interface RichValueMapper<V, VR, K> {
    VR apply(final V value, final K key, final RecordContext recordContext);
}

public interface RichValueJoiner<V1, V2, VR, K> {
    VR apply(final V1 value1, final V2 value2, final K key, final RecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichReducer<V, K> {
    V apply(final V value1, final V value2, final K key, final RecordContext recordContext);
}

public interface RichAggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final RecordContext recordContext);
}

 
public interface RichMerger<K, V> {
	V apply(final K aggKey, final V aggOne, final V aggTwo, final RecordContext recordContext);
}

 
 
public interface RichValueTransformer<V, VR, K> {
	void init(final ProcessorContext context);

	VR transform(final V value, final K key);

	void close();
}


 
public interface RichValueTransformerSupplier<V, VR, K> {

    RichValueTransformer<V, VR, K> get();
}


 


 

Public Interfaces


KStream

 

Code Block
languagejava
KStream<K, V> filter(RichPredicate<
Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V> reducer);


KTable<K, V> reduce(final RichReducer<V> reducer,
                    final String queryableStoreName);


KTable<K, V> reduce(final RichReducer<V> reducer,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                                                 final Windows<W> windows,
                                                 final String queryableStoreName);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                                                 final Windows<W> windows);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                                                 final Windows<W> windows,
                                                 final StateStoreSupplier<WindowStore> storeSupplier);


KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows,
                              final String queryableStoreName);


 
KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows);
 
KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows,
                              final StateStoreSupplier<SessionStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);
 
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);



<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);



<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final Aggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);


<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);
 
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final Aggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final StateStoreSupplier<WindowStore> storeSupplier);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final StateStoreSupplier<WindowStore> storeSupplier);
 
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
   V> predicate);
KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);
<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR, ? super K> mapper);
<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
 
<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>, ? super K> mapper);



void foreach(final RichForeachAction<? super K, ? super V> action);
KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action);
KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates);
 
<VR> KStream<K, VR> transformValues(final RichValueTransformerSupplier<? super V, ? extends VR, ? super K> valueTransformerSupplier,
                                    final String... stateStoreNames);
<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector);
<KR> KGroupedStream<KR, V>   groupBy(final RichAggregator<RichKeyValueMapper<? super K, ? super V, VR>KR> aggregatorselector,
                                   final Serialized<KR, V> serialized);
 

<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
            final Windows<W> windows,
               final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
                             final StateStoreSupplier<WindowStore>JoinWindows storeSupplierwindows);



<T> KTable<Windowed<K><VO, VR> KStream<K, T>VR> aggregatejoin(final RichInitializer<T> initializerKStream<K,
         VO> otherStream,
                             final Aggregator<RichValueJoiner<? super KV, ? super VVO, ? T>extends aggregatorVR,
 ? super     K> joiner,
                              final Merger<? super K, T> sessionMergerJoinWindows windows,
                             final Joined<K, V, VO> joined);
 

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, SessionWindowsVO> sessionWindowsotherStream,
                                 final RichValueJoiner<? super V, ? super VO, final? Serde<T>extends aggValueSerdeVR,
 ? super K> joiner,
                                 final StringJoinWindows queryableStoreNamewindows);

<T> KTable<Windowed<K><VO, VR> KStream<K, T>VR> aggregateleftJoin(final KStream<K, Initializer<T>VO> initializerotherStream,
                                     final RichAggregator<RichValueJoiner<? super KV, ? super VVO, ? T>extends aggregatorVR,
 ? super K> joiner,
                                 final Merger<? super K, T> sessionMerger,
 JoinWindows windows,
                                 final Joined<K, V, VO> joined);
 

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, SessionWindowsVO> sessionWindowsotherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super finalK> Serde<T> aggValueSerdejoiner,
                                     final StringJoinWindows queryableStoreNamewindows);

<T> KTable<Windowed<K><VO, VR> KStream<K, T>VR> aggregateouterJoin(final KStream<K, RichInitializer<T>VO> initializerotherStream,
                                  final RichValueJoiner<? super finalV, RichAggregator<? super KVO, ? superextends VVR, ? super T>K> aggregatorjoiner,
                                  final JoinWindows windows,
          final Merger<? super K, T> sessionMerger,
                   final Joined<K, V, VO> joined);
 

<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
       final SessionWindows sessionWindows,
                    final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner);
<VT, VR> KStream<K, VR> join(final KTable<K, Serde<T>VT> aggValueSerdetable,
                             final RichValueJoiner<? super K, ? super V, ? final String queryableStoreName);


<T> KTable<Windowed<K>super VT, T>? aggregate(finalextends RichInitializer<T>VR> initializerjoiner,
                             final Joined<K, V, VT> joined);
 

<VT, VR> KStream<K, VR> leftJoin(final Aggregator<?KTable<K, superVT> Ktable,
 ? super V, T> aggregator,
                            final RichValueJoiner<? super K, ? super V, ? super finalVT, Merger<? super K, T> sessionMerger,
? extends VR> joiner);
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super K, ? super V, ? super VT, ? extends finalVR> SessionWindows sessionWindowsjoiner,
                                 final Joined<K, V, VT> final Serde<T> aggValueSerdejoined);

<T> KTable<Windowed<K> 
<GK, GV, RV> KStream<K, T>RV> aggregatejoin(final GlobalKTable<GK, Initializer<T>GV> initializerglobalKTable,
                                     final RichAggregator<RichKeyValueMapper<? super K, ? super V, ? extends T>GK> aggregatorkeyValueMapper,
                                 final RichValueJoiner<? super K, ? super finalV, Merger<? super K, T> sessionMerger,
       GV, ? extends RV> joiner);

<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                              final SessionWindows sessionWindows,
     final RichKeyValueMapper<? super K, ? super V, ? extends GK>   keyValueMapper,
                    final Serde<T> aggValueSerde);
 
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
           final RichValueJoiner<? super K, ? super V, ? super GV, ? extends RV> valueJoiner);

 





KTable

 

Code Block
languagejava
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate);
KTable<K, V>   filter(final RichAggregator<RichPredicate<? super K, ? super V, T> aggregatorV> predicate,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate);
KTable<K, V>  filterNot(final Merger<RichPredicate<? super K, ? super T>V> sessionMergerpredicate,
                                     final SessionWindows sessionWindows,
                          final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR, ? super K> mapper);
<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR, ? super K> mapper,
           final Serde<T> aggValueSerde);


<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

<KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
 

<KR, VR> KGroupedTable<KR, VR>    groupBy(final RichAggregator<RichKeyValueMapper<? super K, ? super V, T> aggregatorKeyValue<KR,
 VR>> selector);
<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR,          VR>> selector,
              final Merger<? super K, T> sessionMerger,
                     final Serialized<KR, VR> serialized);
 

<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
      final SessionWindows sessionWindows,
                    final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner);
<VO, VR> KTable<K, VR> join(final KTable<K, Serde<T>VO> aggValueSerdeother,
                            final RichValueJoiner<? super V, ? super VO, ?  final StateStoreSupplier<SessionStore> storeSupplier);

<T> KTable<Windowed<K>extends VR, T>? aggregate(finalsuper RichInitializer<T>K> initializerjoiner,
                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
 

<VO, VR> KTable<K, VR> leftJoin(final Aggregator<?KTable<K, superVO> Kother,
 ? super V, T> aggregator,
                           final RichValueJoiner<? super V, ? super VO, ? extends  finalVR, Merger<? super K, T> sessionMerger,
 K> joiner);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super K, ? super V, ? super VO, ? finalextends SessionWindowsVR> sessionWindowsjoiner,
                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
 

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, Serde<T>VO> aggValueSerdeother,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR, ? finalsuper StateStoreSupplier<SessionStore>K> storeSupplierjoiner);

<T> KTable<Windowed<K><VO, VR> KTable<K, T>VR> aggregateouterJoin(final KTable<K, RichInitializer<T>VO> initializerother,
                                 final RichValueJoiner<? super  finalV, RichAggregator<? super KVO, ? superextends VVR, ? super T>K> aggregatorjoiner,
                                 final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

 

 

 

 

 

 

 

KGroupedStream

 

 

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V, K> reducer);

KTable<K, V> reduce(final RichReducer<V, K> reducer,
 final Merger<? super K, T> sessionMerger,
                         final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR,   final SessionWindows sessionWindowsK> initializer,
                             final RichAggregator<? super K, ? super V, VR> final Serde<T> aggValueSerdeaggregator,
                             final Materialized<K, VR,      final StateStoreSupplier<SessionStore> storeSupplier);


 

 

  • KGroupedTable

 

 

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V> adder,
KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR, K> initializer,
                             final RichAggregator<? Reducer<V>super subtractorK,
 ? super V, VR> aggregator);

 

 

 

SessionWindowedKStream

 

There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces. 

 

Code Block
languagejava
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T, Windowed<K>> initializer,
                final String queryableStoreName);


KTable<K, V> reduce(final Reducer<V> adder,
                    final RichReducer<V> subtractor,
                    final String queryableStoreName);


KTable<K, V> reduce(final RichReducer<V> adder,
                          final RichAggregator<? RichReducer<V>super subtractorK,
 ? super V, T> aggregator,
               final String queryableStoreName);


 
KTable<K, V> reduce(final RichReducer<V> adder,
                final RichMerger<? super K, finalT> Reducer<V> subtractorsessionMerger);


KTable<K<VR> KTable<Windowed<K>, V>VR> reduceaggregate(final RichInitializer<VR, Reducer<V>Windowed<K>> adderinitializer,
                    final RichReducer<V> subtractor);


KTable<K, V> reduce(final  RichReducer<V> adder,
                    final RichAggregator<? RichReducer<V> subtractor);


KTable<K, V> reduce(final RichReducer<V> addersuper K, ? super V, VR> aggregator,
                    final Reducer<V> subtractor,
                 final RichMerger<? super finalK, StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> reduce(final Reducer<V> adder,
VR> sessionMerger,
                        final RichReducer<V> subtractor,
             final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized);

KTable<Windowed<K>, V> reduce(final RichReducer<V, StateStoreSupplier<KeyValueStore>K> storeSupplierreducer);


KTable<KKTable<Windowed<K>, V> reduce(final RichReducer<V, RichReducer<V>K> adderreducer,
                    final RichReducer<V> subtractor,
        final Materialized<K, V, SessionStore<Bytes,         final StateStoreSupplier<KeyValueStore> storeSupplierbyte[]>> materializedAs);


 
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
,

 

 

 

 

TimeWindowedKStream

  

Code Block
languagejava
 
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> initializer,
                                       final RichAggregator<? super K, ? super V, VR> subtractor,aggregator);
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR,                      K> initializer,
   final String queryableStoreName);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
 aggregator,
                                       final RichAggregator<? super K, ? super V, VR> subtractor);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
 Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);

KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer);
KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer,
                              final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);






 

 

 

 

KGroupedTable

  

Code Block
languagejava
 
KTable<K, V>  reduce(final RichAggregator<? super KRichReducer<V, ? super V, VR>K> adder,
                             final RichAggregator<? super KRichReducer<V, ? super V, VR>K> subtractor,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

KTable<K, V> reduce(final RichReducer<V, finalK> Serde<VR> aggValueSerdeadder,
                             final String queryableStoreName);
final RichReducer<V, K> subtractor);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final Serde<VR> aggValueSerde);

 Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


 

 

 

 

 

 

 

Proposed changes

 

 

  • Limiting the ProcessorContext - RecordContext interface

...

  • Move RecordContext from  .processor.internals  to  .processor 

 

  • Make record context open to public

Currently we set record context through InternalProcessorContext (StreamTask.updateProcessorContext()) :
Code Block
languagejava
public// interfacethe RecordContextbelow {
code snippet already exists, String applicationId();
    TaskId taskId();
    StreamsMetrics metrics();
    String topic();
    int partition();
	void commit();
    long offset();this is just for background. 
private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
    long timestamp();
    Map<String, Object> appConfigs();
    Map<String, Object> appConfigsWithPrefix(String prefix);
}
 
 
public interface ProcessorContext extends RecordContext {
   // all methods but the ones in RecordContext
}

 

processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
    processorContext.setCurrentNode(currNode);
}

 

 

 

Sample processor should look like this:

Once we need a conversion from ProcessorContext and RecordContext, we just cast: 

Code Block
languagejava
private class KStreamMapProcessorKStreamKTableJoinProcessor<K1, extendsK2, AbstractProcessor<KV1, V>V2, {
R> extends AbstractProcessor<K1, V1> @Override{

    public void init(ProcessorContext processorContext) {...
    private    super.init(processorContext);
    RecordContext recordContext;    richMapper.init((RecordContext) processorContext);  				// this line HEREis WEadded MAKEin Athis CASTKIP

    }...

    @Override
    	public void process(final KK1 key, final VV1 value) {

    	recordContext = new RecordContext() {               // recordContext initialization is added in this KIP

    		@Override
   V1 newValue		public =long mapper.apply(key, value);
offset() {
        		return context().recordContext().forwardoffset(key, newValue);
    		}

    		@Override
    		public voidlong closetimestamp() {
         super.close		return context().recordContext().timestamp();
    		}

    mapper.close();		@Override
    }
}

  • Rich Interfaces

 

 

Code Block
languagejava
public interface RichValueMapper<V, VR>		public String topic() {
    VR apply(final V value, final RecordContext recordContext		return context().recordContext().topic();
    		}

public interface RichValueJoiner<V1, V2, VR> {
 		@Override
    		public VRint apply(final V1 value1, final V2 value2, final RecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichReducer<V> {partition() {
        		return context().recordContext().partition();
    		}
      };

	
 
    Vif apply(finalkey V!= value1,null final&& Vvalue value2, final RecordContext recordContext);
}


public interface RichInitializer<VA> {
!= null) {
       VA apply(final RecordContext recordContext);
}


public interface RichAggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final RecordContext recordContext);
}


public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final RecordContext recordContext);
}

V2 value2 = valueGetter.get(keyMapper.apply(key, value));
        if (leftJoin || value2 != null) {
            context().forward(key, joiner.apply(value, value2, recordContext));    
        }
    }
}


}

 

Rejected Alternatives

Not yet.

...