Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4125
           
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-39074218
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4726
 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP combines KIP-149 and provides a hybrid solution to rich functions in Streams and accessing read-only keys within ValueJoiner, ValueTransformer, ValueMapper interfaces. 

 

Rich functions are Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params), close() methods are needed.

Another motivation of this KIP is to introduce on-demand commit() feature. 

Public Interfaces

The following methods will be added to related classes.

 

...

. For example:

  • having access to RecordContext within an operator
  • having access to a read-only key for ValueJoiner, ValueTransformer, ValueMapper interfaces



Rich Interfaces

 

Code Block
languagejava
 
public interface RichInitializer<V, K> {
    V apply(K key);
}

public interface RichValueMapper<V, VR, K> {
    VR apply(final V value, final K key, final RecordContext recordContext);
}

public interface RichValueJoiner<V1, V2, VR, K> {
    VR apply(final V1 value1, final V2 value2, final K key, final RecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichReducer<V, K> {
    V apply(final V value1, final V value2, final K key, final RecordContext recordContext);
}

public interface RichAggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final RecordContext recordContext);
}

 
public interface RichMerger<K, V> {
	V apply(final K aggKey, final V aggOne, final V aggTwo, final RecordContext recordContext);
}

 
 
public interface RichValueTransformer<V, VR, K> {
	void init(final ProcessorContext context);

	VR transform(final V value, final K key);

	void close();
}


 
public interface RichValueTransformerSupplier<V, VR, K> {

    RichValueTransformer<V, VR, K> get();
}


 


 

Public Interfaces


KStream

 

Code Block
languagejava
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);
KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);
<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
<VR> KStream<K, VR> mapValues(RichValueMapper<KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);


KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);


<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);


<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);


<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> mapper);


<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);


<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor);


void foreach(final RichForeachAction<? super K, ? super V> action);


KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action);


KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates);


<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector);


<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector,
                                   final Serde<KR> keySerde,
                                   final Serde<V> valSerde);


<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super V, ? superextends VOVR, ? extendssuper VR> joiner,
                             final JoinWindows windows);


<VO, VR>K> mapper);
<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
 
<VR> KStream<K, VR> joinflatMapValues(final KStream<K,RichValueMapper<? VO>super otherStreamV,
 ? extends Iterable<? extends VR>, ? super K> mapper);



void foreach(final RichForeachAction<? super K, ? super V> action);
KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action);
KStream<K, V>[]  branch(final RichValueJoiner<RichPredicate<? super VK, ? super VO, ? extendsV>... predicates);
 
<VR> KStream<K, VR> joiner,
          transformValues(final RichValueTransformerSupplier<? super V, ? extends VR, ? super K> valueTransformerSupplier,
                   final JoinWindows windows,
               final String... stateStoreNames);
<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR>  final Serde<K> keySerde,
          selector);
<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector,
                   final Serde<V> thisValueSerde,
              final Serialized<KR,              final Serde<VO> otherValueSerdeV> serialized);
 

<VO, VR> KStream<K, VR> leftJoinjoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joinerVR,
 ? super K> joiner,
                             final JoinWindows windows);


<VO, VR> KStream<K, VR> leftJoinjoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joinerVR,
 ? super K> joiner,
                             final JoinWindows windows,
                                 final Serde<K>Joined<K, keySerdeV,
                                 final Serde<V> thisValSerde,
                                 final Serde<VO> otherValueSerdeVO> joined);
 

<VO, VR> KStream<K, VR> outerJoinleftJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner,
                                  final JoinWindows windows);


<VO, VR> KStream<K, VR> outerJoinleftJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR>VR, ? super K> joiner,
                                  final JoinWindows windows,
                                 final Joined<K, V, VO> joined);
 

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, Serde<K>VO> keySerdeotherStream,
                                  final Serde<V> thisValueSerde RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
                                  final Serde<VO>JoinWindows otherValueSerdewindows);


<VT<VO, VR> KStream<K, VR> joinouterJoin(final KTable<KKStream<K, VT>VO> tableotherStream,
                                  final RichValueJoiner<? super V, ? super VTVO, ? extends VR> joiner);


<VTVR, VR>? KStream<K,super VR> join(final KTable<K, VT> table,K> joiner,
                             final RichValueJoiner<? super V, ? superfinal VT, ? extends VR> joinerJoinWindows windows,
                             final Serde<K> keySerde,
   final Joined<K, V,                        final Serde<V> valSerdeVO> joined);
 

<VT, VR> KStream<K, VR> leftJoinjoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super VK, ? super V, ? super VT, ? extends VR> joiner);


<VT, VR> KStream<K, VR> leftJoinjoin(final KTable<K, VT> table,
                             final RichValueJoiner<? super  finalK, RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                 final Serde<K>Joined<K, keySerdeV,
                                 final Serde<V> valSerdeVT> joined);
 

<GK<VT, GV, RV>VR> KStream<K, RV>VR> joinleftJoin(final GlobalKTable<GKKTable<K, GV>VT> globalKTabletable,
                                 final RichKeyValueMapper<RichValueJoiner<? super K, ? super V, ? extendssuper GK> keyValueMapperVT,
 ? extends VR> joiner);
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super K, ? super V, ? super GVVT, ? extends RV> joinerVR> joiner,
                                 final Joined<K, V, VT> joined);

 
<GK, GV, RV> KStream<K, RV> leftJoinjoin(final GlobalKTable<GK, GV> globalKTable,
                                     final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                 final RichValueJoiner<? super K, final RichValueJoiner<? super V, ? super GV, ? extends RV> valueJoinerjoiner);

 

 

  • KTable.java

 

Code Block
languagejava
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate);


KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName);


KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate);


KTable<K, V> filterNot(final RichPredicate<<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                                     final RichKeyValueMapper<? super K, ? super V>V, predicate,? finalextends GK> keyValueMapper,
                                     final RichValueJoiner<? super K, ? super V, ? super GV, ? extends RV> valueJoiner);

 





KTable

 

Code Block
languagejava
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate);
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate,
StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper);


<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper,
                             final Serde<VR>Materialized<K, valueSerdeV,
 KeyValueStore<Bytes,                            final StateStoreSupplier<KeyValueStore> storeSupplier);


void foreach(final RichForeachAction<byte[]>> materialized);
KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> actionpredicate);


<KR> KStream<KRKTable<K, V> toStreamfilterNot(final RichKeyValueMapper<RichPredicate<? super K, ? super V> Vpredicate,
 ? extends KR> mapper);


<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super           final Materialized<K, V, KeyValue<KRKeyValueStore<Bytes, VR>> selectorbyte[]>> materialized);


<KR, VR> KGroupedTable<KR<VR> KTable<K, VR> groupBymapValues(final RichKeyValueMapper<RichValueMapper<? super KV, ? superextends VVR, ? KeyValue<KR,super VR>> selector,
             K> mapper);
<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR, ? super K> mapper,
                          final Serde<KR> keySerde,
 final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

<KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
 

<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR,         final Serde<VR> valueSerdeVR>> selector);


<VO<KR, VR> KTable<KKGroupedTable<KR, VR> joingroupBy(final KTable<K RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VO>VR>> otherselector,
                            final RichValueJoiner<? super V, ? super VO, ? extends      final Serialized<KR, VR> joinerserialized);
 

<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner);
<VO, VR> KTable<K, VR> joiner join(final KTable<K, VO> other,
                            final Serde<VR> joinSerde RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
                            final String queryableStoreName Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
 

<VO, VR> KTable<K, VR> joinleftJoin(final KTable<K, VO> other,
                            final RichValueJoiner<?    final RichValueJoiner<? super V, ? super VO, ? extends VR> joinerVR,
 ? super                          final StateStoreSupplier<KeyValueStore> storeSupplier);
K> joiner);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final RichValueJoiner<ValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
     Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
 

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                           final Serde<VR> joinSerde,
    final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super                 final String queryableStoreName);
K> joiner);
<VO, VR> KTable<K, VR> leftJoinouterJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner,
                                 final StateStoreSupplier<KeyValueStore> storeSupplier);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
     Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

 

 

 

 

 

 

 

KGroupedStream

 

 

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V, K> reducer);

KTable<K, V> reduce(final RichReducer<V, K> reducer,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> aggregate(final RichValueJoiner<?RichInitializer<VR, superK> Vinitializer,
 ? super VO, ? extends VR> joiner);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<RichAggregator<? super VK, ? super VOV, ? extends VR> joineraggregator,
                             final Materialized<K, VR,  final Serde<VR> joinSerde,
    KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR, K> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator);

 

 

 

SessionWindowedKStream

 

There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces. 

 

Code Block
languagejava
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T, Windowed<K>> initializer,
    String queryableStoreName);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<RichAggregator<? super VK, ? super VOV, ? extends VR> joinerT> aggregator,
                                 final StateStoreSupplier<KeyValueStore> storeSupplier);

 

 

 

  • KGroupedStream

 

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V> reducerfinal RichMerger<? super K, T> sessionMerger);


KTable<K<VR> KTable<Windowed<K>, V>VR> reduceaggregate(final RichInitializer<VR, RichReducer<V>Windowed<K>> reducerinitializer,
                    final String queryableStoreName);


KTable<K, V> reduce(final RichReducer<V> reducer,
             final RichAggregator<? super K, ? super V, final StateStoreSupplier<KeyValueStore> storeSupplier);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,VR> aggregator,
                                       final RichMerger<? super K,       final Windows<W> windowsVR> sessionMerger,
                                       final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized);

KTable<Windowed<K>, V> reduce(final RichReducer<V, K> final String queryableStoreNamereducer);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V, RichReducer<V>K> reducer,
                              final Materialized<K, V, SessionStore<Bytes,                final Windows<W> windowsbyte[]>> materializedAs);


<W extends Window> ,

 

 

 

 

TimeWindowedKStream

  

Code Block
languagejava
 
<VR> KTable<Windowed<K>, V>VR> reduceaggregate(final RichInitializer<VR, RichReducer<V>K> reducerinitializer,
                                       final RichAggregator<? super K, ? super V, VR> aggregator);
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, Windows<W>K> windowsinitializer,
                                       final RichAggregator<? super K, ?      final StateStoreSupplier<WindowStore> storeSupplier);


KTable<Windowed<K>, V> reduce(final RichReducer<V> reducersuper V, VR> aggregator,
                              final SessionWindows sessionWindows,
       final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);

KTable<Windowed<K>, V> reduce(final RichReducer<V,               final String queryableStoreName);


 K> reducer);
KTable<Windowed<K>, V> reduce(final RichReducer<V, RichReducer<V>K> reducer,
                              final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);






 

 

 

 

KGroupedTable

  

Code Block
languagejava
 
KTable<KSessionWindows sessionWindows);
 
KTable<Windowed<K>, V> reduce(final RichReducer<V, RichReducer<V>K> reduceradder,
                    final RichReducer<V,         final SessionWindows sessionWindowsK> subtractor,
                    final Materialized<K, V, KeyValueStore<Bytes,       final StateStoreSupplier<SessionStore> storeSupplierbyte[]>> materialized);

<VR> KTable<K, VR>V> aggregatereduce(final RichReducer<V, RichInitializer<VR>K> initializeradder,
                    final RichReducer<V, K> subtractor);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final Aggregator<RichAggregator<? super K, ? super V, VR> aggregatoradder,
                             final Serde<VR> aggValueSerde RichAggregator<? super K, ? super V, VR> subtractor,
                             final String queryableStoreName);
 Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final Initializer<VR>RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregatoradder,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);
 
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);



<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);



<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final Aggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);


<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);
 
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final Aggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final StateStoreSupplier<WindowStore> storeSupplier);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final StateStoreSupplier<WindowStore> storeSupplier);
 
<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final StateStoreSupplier<WindowStore> storeSupplier);



<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final String queryableStoreName);

<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final String queryableStoreName);

<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final String queryableStoreName);


<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde);

<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde);
 
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde);


<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final StateStoreSupplier<SessionStore> storeSupplier);

<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final Aggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final StateStoreSupplier<SessionStore> storeSupplier);

<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final StateStoreSupplier<SessionStore> storeSupplier);


 

 

  • KGroupedTable

 

 

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V> adder,
                    final Reducer<V> subtractor,
                    final String queryableStoreName);


KTable<K, V> reduce(final Reducer<V> adder,
                    final RichReducer<V> subtractor,
                    final String queryableStoreName);


KTable<K, V> reduce(final RichReducer<V> adder,
                    final RichReducer<V> subtractor,
                    final String queryableStoreName);


 
KTable<K, V> reduce(final RichReducer<V> adder,
                    final Reducer<V> subtractor);


KTable<K, V> reduce(final Reducer<V> adder,
                    final RichReducer<V> subtractor);


KTable<K, V> reduce(final RichReducer<V> adder,
                    final RichReducer<V> subtractor);


KTable<K, V> reduce(final RichReducer<V> adder,
                    final Reducer<V> subtractor,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> reduce(final Reducer<V> adder,
                    final RichReducer<V> subtractor,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> reduce(final RichReducer<V> adder,
                    final RichReducer<V> subtractor,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


 
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final String queryableStoreName);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final Serde<VR> aggValueSerde);


<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


 

 

 

 

 

 

Proposed changes

 

 

  • Make record context open to public

Currently we set record context through InternalProcessorContext (StreamTask.updateProcessorContext()

 

 

Code Block
languagejava
 private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
    processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
    processorContext.setCurrentNode(currNode);
}
 

 

However, the record context is not available in ProcessorContext. So, we make the following changes to make it "public"

 

Code Block
languagejava
 
public interface ProcessorContext {
  ...
  all existing methods
  ...
 
  RecordContext recordContext();
}
 
 
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
  ...
  @Override
  public RecordContext recordContext() {
    return this.recordContext();
  }
}
 

 

 

  • ExtendedRecordContext

We already have RecordContext class with desired set of methods. However, it does not have commit() method as this is related with ProcessorContext. In our ExtendedRecordContext class, we construct commit() method from ProcessorContext object. 

RichAggregator<? super K, ? super V, VR> subtractor);

 

 

 

 

 

 

Proposed changes

 

 

  • Move RecordContext from  .processor.internals  to  .processor 

 

  • Make record context open to public

Currently we set record context through InternalProcessorContext (StreamTask.updateProcessorContext()) :
Code Block
languagejava
// the below code snippet already exists, this is just for background. 
private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
    processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
    processorContext.setCurrentNode(currNode);
}

 

 

 

Sample processor should look like this:

 

Code Block
languagejava
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {

    ...
    private RecordContext recordContext;       // this line is added in this KIP

    ...

    @Override
	public void process(final K1 key, final V1 value) {

    	recordContext = new RecordContext() {               // recordContext initialization is added in this KIP

    		@Override
    		public long offset(
Code Block
languagejava
public interface ExtendedRecordContext extends RecordContext {
    void commit ();
}

 
 
 
 
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {

    ...
    private ExtendedRecordContext extendedRecordContext;

    ...

    @Override
	public void process(final K1 key, final V1 value) {

    	extendedRecordContext = new ExtendedRecordContext() {
    		@Override
    		public void commit() {
        		context().commit();
    		}

    		@Override
    		public long offset() {
        		return context().recordContext().offset();
    		}

    		@Override
    		public long timestamp() {
        		return context().recordContext().timestamp();
    		}

    		@Override
    		public String topic() {
        		return context().recordContext().topic();
    		}

    		@Override
    		public int partition() {
        		return context().recordContext().partition();
    		}
      };

	
 
    if (key != null && value != null) {
        final V2 value2 = valueGetter.get(keyMapper.apply(key, value)		return context().recordContext().offset();
    		}

    if (leftJoin || value2 != null		@Override
    		public long timestamp() {
          		return  context().forward(key, joiner.apply(value, value2, extendedRecordContext)recordContext().timestamp();
    		}

    }		@Override
    }
}


}

 

  • Rich Interfaces

 

 

 

Code Block
languagejava
public interface RichValueMapper<V, VR> 		public String topic() {
    VR apply(final V value, final ExtendedRecordContext recordContext		return context().recordContext().topic();
    		}

public interface RichValueJoiner<V1, V2, VR> {
 		@Override
    		public VRint apply(final V1 value1, final V2 value2, final ExtendedRecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {partition() {
        		return context().recordContext().partition();
    		}
      };

	
 
    VRif apply(finalkey K!= key, final Vnull && value, final ExtendedRecordContext recordContext);
}

public interface RichReducer<V>!= null) {
    V apply(final V value1, final VV2 value2, final ExtendedRecordContext recordContext);
}

public interface RichAggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final ExtendedRecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final ExtendedRecordContext recordContext);
}


public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final ExtendedRecordContext recordContext);
}

= valueGetter.get(keyMapper.apply(key, value));
        if (leftJoin || value2 != null) {
            context().forward(key, joiner.apply(value, value2, recordContext));    
        }
    }
}


}

 

Rejected Alternatives

Not yet.

...