Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4125
 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3907
                      
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4218
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4726
 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This KIP combines KIP-149 and provides an atomic a hybrid solution to rich functions in Streams and accessing read-only keys within ValueJoiner, ValueTransformer, ValueMapper interfaces

 

Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods. For example:

...

  • having access to RecordContext within an operator
  • having access to a read-only key for XXXWithKey interfacesValueJoiner, ValueTransformer, ValueMapper interfaces



Rich Interfaces

 

Code Block
languagejava
 
public interface RichValueMapper<KRichInitializer<V, V, VR>K> {
    VRV apply(final K key, final V );
}

public interface RichValueMapper<V, VR, K> {
    VR apply(final V value, final K key, final RecordContext recordContext);
}

public interface RichValueJoiner<KRichValueJoiner<V1, V1V2, V2VR, VR>K> {
    VR apply(final KV1 keyvalue1, final V1V2 value1value2, final V2K value2key, final RecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichReducer<KRichReducer<V, V>K> {
    V apply(final KV keyvalue1, final V value1value2, final VK value2key, final RecordContext recordContext);
}

public interface RichAggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final RecordContext recordContext);
}

 
public interface RichValueTransformer<KRichMerger<K, V,V> VR> {
	voidV initapply(final ProcessorContext context);

	VR transform(K aggKey, final V value);

	@DeprecatedaggOne, final V aggTwo, final RecordContext recordContext);
}

 
 
public interface RichValueTransformer<V, VR, K> {
	void init(final ProcessorContext context);

	VR punctuatetransform(final KV keyvalue, final longK timestampkey);

	void close();
}


 
public interface RichValueTransformerSupplier<KRichValueTransformerSupplier<V, VVR, VR>K> {

    RichValueTransformer<KRichValueTransformer<V, VVR, VR>K> get();
}


 


 

Public Interfaces


KStream

 

Code Block
languagejava
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);


KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);


<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);


<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);


<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> VR, ? super K> mapper);


<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
 

<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor VR>, ? super K> mapper);



void foreach(final RichForeachAction<? super K, ? super V> action);


KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action);


KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates);
 

<KR><VR> KGroupedStream<KRKStream<K, V>VR> groupBytransformValues(final RichKeyValueMapper<? super K, RichValueTransformerSupplier<? super V, KR> selector);


<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? superextends KVR, ? super V, KR> selectorK> valueTransformerSupplier,
                                    final Serde<KR> keySerde,
       String... stateStoreNames);
<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector);
<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector,
                                   final Serialized<KR, Serde<V>V> valSerdeserialized);
 

<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner,
                             final JoinWindows windows);


<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner,
                             final JoinWindows windows,
                             final Serde<K>Joined<K, keySerdeV,
                             final Serde<V> thisValueSerde,
                             final Serde<VO> otherValueSerde);
VO> joined);
 

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR>VR, ? super K> joiner,
                                 final JoinWindows windows);


<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner,
                                 final JoinWindows windows,
                                 final Serde<K> keySerde,
Joined<K, V, VO> joined);
 

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final Serde<V> thisValSerde RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
                                  final Serde<VO>JoinWindows otherValueSerdewindows);


<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner,
                                  final JoinWindows windows);


<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
,
                                          final RichValueJoiner<? superJoined<K, V, ? super VO, ? extends VR> joiner,
     VO> joined);
 

<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final JoinWindows windows,
                 RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner);
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                 final Serde<K> keySerde,
          final RichValueJoiner<? super K, ? super V, ? super VT, ? extends             final Serde<V> thisValueSerdeVR> joiner,
                             final Joined<K, V,   final Serde<VO> otherValueSerdeVT> joined);
 

<VT, VR> KStream<K, VR> joinleftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner);


<VT, VR> KStream<K, VR> joinleftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner,
                             final Serde<K> keySerde,
  final Joined<K, V,                         final Serde<V> valSerdeVT> joined);


<VT 
<GK, GV, VR>RV> KStream<K, VR>RV> leftJoinjoin(final KTable<KGlobalKTable<GK, VT>GV> tableglobalKTable,
                                 final RichValueJoiner<RichKeyValueMapper<? super VK, ? super VTV, ? extends VR> joiner);


<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,GK> keyValueMapper,
                                 final RichValueJoiner<? super K, ? super V, ? super VTGV, ? extends VR>RV> joiner);

<GK,
 GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                         final Serde<K> keySerde,
          final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
             final Serde<V> valSerde);


<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
              final RichValueJoiner<? super K, ? super V, ? super GV, ? extends RV> valueJoiner);

 





KTable

 

Code Block
languagejava
KTable<K,      V> filter(final RichKeyValueMapper<RichPredicate<? super K, ? super V, ? extends GK> keyValueMapper,
     V> predicate);
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V>   filterNot(final RichValueJoiner<RichPredicate<? super VK, ? super GV, ? extends RV> joinerV> predicate);


<GKKTable<K, GV, RV> KStream<K, RV> leftJoinV> filterNot(final GlobalKTable<GK,RichPredicate<? GV>super globalKTableK,
 ? super V> predicate,
                       final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR>   mapValues(final RichKeyValueMapper<RichValueMapper<? super KV, ? superextends VVR, ? extendssuper GK> keyValueMapper,
             K> mapper);
<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR, ? super K> mapper,
                        final RichValueJoiner<? super V, ? superfinal GVMaterialized<K, ?VR, extends RV> valueJoinerKeyValueStore<Bytes, byte[]>> materialized);

 

 

  • KTable.java

 

Code Block
languagejava
KTable<K<KR> KStream<KR, V> filtertoStream(final RichPredicate<RichKeyValueMapper<? super K, ? super V> predicateV, ? extends KR> mapper);
 

KTable<K<KR, VR> KGroupedTable<KR, V>VR> filtergroupBy(final RichPredicate<RichKeyValueMapper<? super K, ? super V> predicateV, finalKeyValue<KR, StringVR>> queryableStoreNameselector);


KTable<K<KR, VR> KGroupedTable<KR, V>VR> filtergroupBy(final RichPredicate<RichKeyValueMapper<? super K, ? super V>V, predicateKeyValue<KR, final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate);


KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper);


<VR>VR>> selector,
                                       final Serialized<KR, VR> serialized);
 

<VO, VR> KTable<K, VR> mapValuesjoin(final RichValueMapper<?KTable<K, superVO> Vother,
 ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper,
         final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> final Serde<VR> valueSerdeother,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


void foreach(final RichForeachAction<RichValueJoiner<? super KV, ? super V> action);


<KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super KVO, ? extends VR, ? super K> Vjoiner,
 ?  extends KR> mapper);


<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);


<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
 

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final RichValueJoiner<? super V, ? super VO, ? extends VR, ? finalsuper Serde<KR> keySerde,
       K> joiner);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final Serde<VR> valueSerde);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> otherValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner,
                            final RichValueJoiner<? super V, ?final superMaterialized<K, VOVR, ? extends VR> joinerKeyValueStore<Bytes, byte[]>> materialized);
 

<VO, VR> KTable<K, VR> joinouterJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joinerVR,
 ? super K> joiner);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                 final Serde<VR> joinSerde,
              final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super   final String queryableStoreName);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
K> joiner,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
     Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

 

 

 

 

 

 

 

KGroupedStream

 

 

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V, K> reducer);

KTable<K, V> reduce(final RichReducer<V, K> reducer,
                    final Materialized<K, V, final StateStoreSupplier<KeyValueStore> storeSupplierKeyValueStore<Bytes, byte[]>> materialized);

<VO,<VR> VR> KTable<K, VR> leftJoinaggregate(final KTable<KRichInitializer<VR, VO>K> otherinitializer,
                                final RichValueJoiner<RichAggregator<? super VK, ? super VOV, ? extends VR> joiner);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> otherVR> aggregator,
                             final Materialized<K, VR, final RichValueJoiner<? super V, ? super VO, ? extends VR> joinerKeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR, K> initializer,
                             final RichAggregator<? super K, final? Serde<VR>super joinSerdeV,
 VR> aggregator);

 

 

 

SessionWindowedKStream

 

There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces. 

 

Code Block
languagejava
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T, Windowed<K>> initializer,
                                     final String queryableStoreName);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
RichAggregator<? super K, ? super V, T> aggregator,
                                     final RichValueJoiner<RichMerger<? super VK, ? super VO, ? extends VR> joinerT> sessionMerger);
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, Windowed<K>> initializer,
                                final  StateStoreSupplier<KeyValueStore> storeSupplier);

<VO, VR> KTable<K, VR> outerJoin(final KTable<KRichAggregator<? super K, ? super V, VO>VR> otheraggregator,
                                 final RichValueJoiner<? super V, ? super VO,final RichMerger<? extends VR> joiner);

<VOsuper K, VR> KTable<KsessionMerger,
 VR>  outerJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
        Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized);

KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer);
KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer,
                              final Serde<VR> joinSerde,
      Materialized<K, V, SessionStore<Bytes, byte[]>> materializedAs);


,

 

 

 

 

TimeWindowedKStream

  

Code Block
languagejava
 
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> initializer,
                           final String queryableStoreName);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
      final RichAggregator<? super K, ? super V, VR> aggregator);
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> initializer,
                                       final RichValueJoiner<RichAggregator<? super VK, ? super VOV, ? extends VR> joineraggregator,
                                       final StateStoreSupplier<KeyValueStore> storeSupplier Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);

 

 

 

  • KGroupedStream

 

Code Block
languagejava
KTable<K, KTable<Windowed<K>, V> reduce(final RichReducer<V>RichReducer<V, K> reducer);


KTable<KKTable<Windowed<K>, V> reduce(final RichReducer<V>RichReducer<V, K> reducer,
                              final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);






 

 

 

 

KGroupedTable

  

Code Block
languagejava
 
 String queryableStoreName);


KTable<K, V> reduce(final RichReducer<V> reducerRichReducer<V, K> adder,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
RichReducer<V, K> subtractor,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

KTable<K, V> reduce(final RichReducer<V, K> adder,
                    final RichReducer<V, K> subtractor);

<VR> KTable<K, VR> aggregate(final Windows<W>RichInitializer<VR> windowsinitializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
             final String queryableStoreName);


<W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
       final RichAggregator<? super K, ? super V, VR> subtractor,
                             final Materialized<K, VR, KeyValueStore<Bytes,  final Windows<W> windowsbyte[]>> materialized);


<W extends Window> KTable<Windowed<K><VR> KTable<K, V>VR> reduceaggregate(final RichReducer<V>RichInitializer<VR> reducerinitializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
            final Windows<W> windows,
               final RichAggregator<?                                 final StateStoreSupplier<WindowStore> storeSupplier);


KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows,
                              final String queryableStoreName);


 
KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows);
 
KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer,
                              final SessionWindows sessionWindows,
                              final StateStoreSupplier<SessionStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);
 
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);



<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final Serde<VR> aggValueSerde,
                                                         final String queryableStoreName);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                         final RichAggregator<? super K, ? super V, VR> aggregator,
                                                         final Windows<W> windows,
                                                         final StateStoreSupplier<WindowStore> storeSupplier);


<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final String queryableStoreName);

<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde);
 
<T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                     final RichAggregator<? super K, ? super V, T> aggregator,
                                     final Merger<? super K, T> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<T> aggValueSerde,
                                     final StateStoreSupplier<SessionStore> storeSupplier);

 

  • KGroupedTable

 

 

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V> adder,
                    final Reducer<V> subtractor,
                    final String queryableStoreName);


KTable<K, V> reduce(final Reducer<V> adder,
                    final RichReducer<V> subtractor,
                    final String queryableStoreName);


KTable<K, V> reduce(final RichReducer<V> adder,
                    final RichReducer<V> subtractor,
                    final String queryableStoreName);


 
KTable<K, V> reduce(final RichReducer<V> adder,
                    final Reducer<V> subtractor);


KTable<K, V> reduce(final Reducer<V> adder,
                    final RichReducer<V> subtractor);


KTable<K, V> reduce(final RichReducer<V> adder,
                    final RichReducer<V> subtractor);


KTable<K, V> reduce(final RichReducer<V> adder,
                    final Reducer<V> subtractor,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> reduce(final Reducer<V> adder,
                    final RichReducer<V> subtractor,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> reduce(final RichReducer<V> adder,
                    final RichReducer<V> subtractor,
                    final StateStoreSupplier<KeyValueStore> storeSupplier);


 
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final String queryableStoreName);


<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor);


<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final Serde<VR> aggValueSerde,
                             final String queryableStoreName);


<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final Serde<VR> aggValueSerde);


<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final RichAggregator<? super K, ? super V, VR> adder,
                             final RichAggregator<? super K, ? super V, VR> subtractor,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


 

 

 

 

 

 

Proposed changes

 

 

  • Make record context open to public

Currently we set record context through InternalProcessorContext (StreamTask.updateProcessorContext()) :
Code Block
languagejava
 private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
    processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
    processorContext.setCurrentNode(currNode);
}
 

 

Thus, the record context is not available in ProcessorContext. We make the following changes to make it "public"

 

Code Block
languagejava
 
public interface ProcessorContext {
  ...
 
  RecordContext recordContext();
}
 
 
public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier {
  ...
  @Override
  public RecordContext recordContext() {
    return this.recordContext();
  }
}
 

 

 

  • Add commit() to RecordContext

We already have RecordContext  class with desired set of methods. However, it does not have commit() method. In this KIP we add commit() method to  RecordContext interface.

...

languagejava
super K, ? super V, VR> subtractor);

 

 

 

 

 

 

Proposed changes

 

 

  • Move RecordContext from  .processor.internals  to  .processor 

 

  • Make record context open to public

Currently we set record context through InternalProcessorContext (StreamTask.updateProcessorContext()) :
Code Block
languagejava
// the below code snippet already exists, this is just for background. 
private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
    processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
    processorContext.setCurrentNode(currNode);
}

...

 

 

 

Sample processor should look like this:

...

Code Block
languagejava
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {

    ...
    private RecordContext recordContext;       // this line is added in this KIP

    ...

    @Override
	public void process(final K1 key, final V1 value) {

    	recordContext = new RecordContext() {
    		@Override
    		public void commit() {
    // recordContext initialization  		context().commit();
    		}is added in this KIP

    		@Override
    		public long offset() {
        		return context().recordContext().offset();
    		}

    		@Override
    		public long timestamp() {
        		return context().recordContext().timestamp();
    		}

    		@Override
    		public String topic() {
        		return context().recordContext().topic();
    		}

    		@Override
    		public int partition() {
        		return context().recordContext().partition();
    		}
      };

	
 
    if (key != null && value != null) {
        final V2 value2 = valueGetter.get(keyMapper.apply(key, value));
        if (leftJoin || value2 != null) {
            context().forward(key, joiner.apply(value, value2, recordContext));    
        }
    }
}


}

...