Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: "Under Discussion"

Discussion thread: TBD here

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params), close() methods are needed.

Public Interfaces

 

 

  • KStream.java

 

Code Block
languagejava
KStream<K, V> filter(Predicate<? super K, ? super V> predicate, final RecordContext recordContext);
 
KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate, final RecordContext recordContext);

<KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper, final RecordContext recordContext);

<VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, final RecordContext recordContext);

<KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper, final RecordContext recordContext);

<VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> processor);<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows, final RecordContext recordContext);

<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows,
                             final Serde<K> keySerde,
                             final Serde<V> thisValueSerde,
                             final Serde<VO> otherValueSerde, 
                             final RecordContext recordContext) 



<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows, 
								 final RecordContext recordContext);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows,
                                 final Serde<K> keySerde,
                                 final Serde<V> thisValSerde,
                                 final Serde<VO> otherValueSerde, 
								 final RecordContext recordContext);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows, 
								  final RecordContext recordContext);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows,
                                  final Serde<K> keySerde,
                                  final Serde<V> thisValueSerde,
                                  final Serde<VO> otherValueSerde, 
								  final RecordContext recordContext);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final RecordContext recordContext);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                             final Serde<K> keySerde,
                             final Serde<V> valSerde, 
							 final RecordContext recordContext);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final RecordContext recordContext);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                 final Serde<K> keySerde,
                                 final Serde<V> valSerde,
							     final RecordContext recordContext);

<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
                                 final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                 final ValueJoiner<? super V, ? super GV, ? extends RV> joiner,
								 final RecordContext recordContext);

<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                                     final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                     final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner, 
									 final RecordContext recordContext);

 

    • KTable.java

 

 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4125
           
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4218
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4726
 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP combines KIP-149 and provides a hybrid solution to rich functions in Streams and accessing read-only keys within ValueJoiner, ValueTransformer, ValueMapper interfaces. 

 

Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods. For example:

  • having access to RecordContext within an operator
  • having access to a read-only key for ValueJoiner, ValueTransformer, ValueMapper interfaces



Rich Interfaces

 

Code Block
languagejava
 
public interface RichInitializer<V, K> {
    V apply(K key);
}

public interface RichValueMapper<V, VR, K> {
    VR apply(final V value, final K key, final RecordContext recordContext);
}

public interface RichValueJoiner<V1, V2, VR, K> {
    VR apply(final V1 value1, final V2 value2, final K key, final RecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichReducer<V, K> {
    V apply(final V value1, final V value2, final K key, final RecordContext recordContext);
}

public interface RichAggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final RecordContext recordContext);
}

 
public interface RichMerger<K, V> {
	V apply(final K aggKey, final V aggOne, final V aggTwo, final RecordContext recordContext);
}

 
 
public interface RichValueTransformer<V, VR, K> {
	void init(final ProcessorContext context);

	VR transform(final V value, final K key);

	void close();
}


 
public interface RichValueTransformerSupplier<V, VR, K> {

    RichValueTransformer<V, VR, K> get();
}


 


 

Public Interfaces


KStream

 

Code Block
languagejava
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);
KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);
<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR, ? super K> mapper);
<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
 
<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>, ? super K> mapper);



void foreach(final RichForeachAction<? super K, ? super V> action);
KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action);
KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates);
 
<VR> KStream<K, VR> transformValues(final RichValueTransformerSupplier<? super V, ? extends VR, ? super K> valueTransformerSupplier,
Code Block
languagejava
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate);


KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);


KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate);


KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);


<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
                             final Serde<VR> valueSerde,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final Serde<VR> joinSerde,
                            final String queryableStoreName);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final StateStoreSupplier<KeyValueStore> storeSupplier);


<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);


<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final Serde<VR> joinSerde,
               final String... stateStoreNames);
<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V,      final String queryableStoreName);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
  KR> selector);
<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector,
                              final ValueJoiner<? super V, ? superfinal VOSerialized<KR, ? extendsV> serialized);
 

<VO, VR> joinerKStream<K,
 VR> join(final KStream<K, VO> otherStream,
                           final StateStoreSupplier<KeyValueStore> storeSupplier);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K,RichValueJoiner<? VO>super otherV,
 ? super VO, ? extends VR, ? super K> joiner,
                       final ValueJoiner<? super V, ? super VO,final ? extends VR> joinerJoinWindows windows);

<VO, VR> KTable<KKStream<K, VR> outerJoinjoin(final KTable<KKStream<K, VO> otherotherStream,
                                 final ValueJoiner<RichValueJoiner<? super V, ? super VO, ? extends VR> joinerVR,
 ? super K> joiner,
                             final Serde<VR>JoinWindows joinSerdewindows,
                             final Joined<K, V, VO> final String queryableStoreNamejoined);
 

<VO, VR> KTable<KKStream<K, VR> outerJoinleftJoin(final KTable<KKStream<K, VO> otherotherStream,
                                 final ValueJoiner<RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner,
                                 final StateStoreSupplier<KeyValueStore>JoinWindows storeSupplier);

 

    • KGroupedStream.java

 

We create a subset of features from ProcessorContext and put into RecordContext interface

Code Block
languagejava
KTable<K, V> reduce(final Reducer<V> reducer);


KTable<K, V> reduce(final Reducer<V> reducer,
windows);
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                        final String queryableStoreName);


KTable<K, V> reduce(final Reducer<V> reducer,
   final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
    final StateStoreSupplier<KeyValueStore> storeSupplier);


<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                    final JoinWindows windows,
                           final Windows<W> windows,
    final Joined<K, V, VO> joined);
 

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final String queryableStoreName);


<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
       RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
                                          final Windows<W>JoinWindows windows);


<W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
          <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? finalsuper Windows<W> windowsVO,
 ? extends VR, ? super K> joiner,
                                  final JoinWindows windows,
      final StateStoreSupplier<WindowStore> storeSupplier);

KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                      final Joined<K, V, VO> joined);
 

<VT, VR> KStream<K, VR> join(final SessionWindows sessionWindowsKTable<K, VT> table,
                             final finalRichValueJoiner<? String queryableStoreName);
KTable<Windowed<K>super K, V>? reduce(final Reducer<V> reducer,
              super V, ? super VT, ? extends VR> joiner);
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                final SessionWindows sessionWindows);

KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                  final RichValueJoiner<? super K, ? super V, ? super VT, ? extends finalVR> SessionWindows sessionWindowsjoiner,
                             final final StateStoreSupplier<SessionStore> storeSupplierJoined<K, V, VT> joined);
 
<VR> KTable<K
<VT, VR> KStream<K, VR> aggregateleftJoin(final Initializer<VR> initializerKTable<K, VT> table,
                                 final Aggregator<RichValueJoiner<? super K, ? super V, VR> aggregator, ? super VT, ? extends VR> joiner);
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                             final Serde<VR> aggValueSerde,
  final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner,
              final String queryableStoreName);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
            final Joined<K, V, VT> joined);
 
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK,       final Aggregator<? super K, ? super V, VR> aggregator,
GV> globalKTable,
                                 final RichKeyValueMapper<? super finalK, Serde<VR> aggValueSerde);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer? super V, ? extends GK> keyValueMapper,
                                 final Aggregator<RichValueJoiner<? super K, ? super V, ? VR>super aggregatorGV,
 ? extends RV> joiner);

<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                 final StateStoreSupplier<KeyValueStore> storeSupplier);

<W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
          final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                     final Aggregator<RichValueJoiner<? super K, ? super V, VR>? super aggregatorGV,
 ? extends RV> valueJoiner);

 





KTable

 

Code Block
languagejava
KTable<K, V> filter(final RichPredicate<? super K, ? super                 V> predicate);
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> filterNot(final RichPredicate<? finalsuper Windows<W> windowsK,
 ? super V> predicate);
KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate,
                       final Materialized<K, V,                   final Serde<VR> aggValueSerde,
            KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR, ? super K> mapper);
<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR, ? super K> mapper,
                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

<KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? finalextends StringKR> queryableStoreNamemapper);

<W extends Window 

<KR, VR> KTable<Windowed<K>KGroupedTable<KR, VR> aggregategroupBy(final Initializer<VR> initializer,
                  RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);
<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
                                       final Aggregator<? super K, ? super V, VR> aggregator Serialized<KR, VR> serialized);
 

<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
         final Windows<W> windows,
                 final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
                            final Serde<VR> aggValueSerdeMaterialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
 

<W extends Window<VO, VR> KTable<Windowed<K>KTable<K, VR> aggregateleftJoin(final Initializer<VR> initializerKTable<K, VO> other,
                                final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
             final Aggregator<? super K, ? super V, VR> aggregator,
           final ValueJoiner<? super K, ? super V, ? super VO, ? extends VR>  joiner,
                                final Windows<W>Materialized<K, windowsVR,
 KeyValueStore<Bytes, byte[]>> materialized);
 

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super   final StateStoreSupplier<WindowStore> storeSupplierK> joiner);

<T> KTable<Windowed<K><VO, VR> KTable<K, T>VR> aggregateouterJoin(final KTable<K, Initializer<T>VO> initializerother,
                                     final Aggregator<RichValueJoiner<? super KV, ? super VVO, T>? extends aggregatorVR,
 ? super K> joiner,
                                 final Merger<? super K, T> sessionMerger,
       Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

 

 

 

 

 

 

 

KGroupedStream

 

 

Code Block
languagejava
KTable<K, V> reduce(final RichReducer<V, K> reducer);

KTable<K, V> reduce(final RichReducer<V, K> reducer,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR, finalK> SessionWindows sessionWindowsinitializer,
                             final RichAggregator<? super K, ? super V, VR> final Serde<T> aggValueSerdeaggregator,
                             final Materialized<K, VR,      final String queryableStoreNameKeyValueStore<Bytes, byte[]>> materialized);

<T><VR> KTable<Windowed<K>KTable<K, T>VR> aggregate(final RichInitializer<VR, Initializer<T>K> initializer,
                             final RichAggregator<? super K, ? super V, VR> aggregator);

 

 

 

SessionWindowedKStream

 

There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces. 

 

Code Block
languagejava
<T> KTable<Windowed<K>,      final Aggregator<? super K, ? super V, T> aggregatorT> aggregate(final RichInitializer<T, Windowed<K>> initializer,
                                     final Merger<? super K, T> sessionMerger,
                               final RichAggregator<? super K, ? super finalV, SessionWindowsT> sessionWindowsaggregator,
                                     final Serde<T> aggValueSerdeRichMerger<? super K, T> sessionMerger);

<T><VR> KTable<Windowed<K>, T>VR> aggregate(final Initializer<T>RichInitializer<VR, Windowed<K>> initializer,
                                       final Aggregator<RichAggregator<? super K, ? super V, T>VR> aggregator,
                                       final Merger<RichMerger<? super K, T>VR> sessionMerger,
                                       final SessionWindowsMaterialized<K, sessionWindows,
                                     final Serde<T> aggValueSerdeVR, SessionStore<Bytes, byte[]>> materialized);

KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer);
KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer,
                              final Materialized<K, V, SessionStore<Bytes,    final StateStoreSupplier<SessionStore> storeSupplier);
  • Limiting the ProcessorContext - RecordContext interface

byte[]>> materializedAs);


,

 

 

 

 

TimeWindowedKStream

  We create a subset of features from ProcessorContext and put into RecordContext interface

Code Block
languagejava
public interface RecordContext {
 
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> initializer,
      String applicationId();
               TaskId taskId();
    StreamsMetrics metrics();
    String topic();
    int partition();
  final RichAggregator<? long offset();
    long timestamp(super K, ? super V, VR> aggregator);
<VR> KTable<Windowed<K>, VR> aggregate(final Map<StringRichInitializer<VR, Object> appConfigs();K> initializer,
    Map<String, Object> appConfigsWithPrefix(String prefix);
}
 
 
public interface ProcessorContext extends RecordContext {
   // all methods but the ones in RecordContext
}

 

Once we need a conversion from ProcessorContext and RecordContext, we just cast: 

Code Block
languagejava
private class KStreamMapProcessor extends AbstractProcessor<K, V> {
    @Override
    public void init(ProcessorContext processorContext) {
final RichAggregator<? super K, ? super V,  super.init(processorContext);VR> aggregator,
        richMapper.init((RecordContext) processorContext);  				// here make a cast
    }

    @Override
    public void process(final K key, final V value) {
    final Materialized<K, VR,  V1 newValue = mapper.apply(key, value);
        context().forward(key, newValue);
    }

    @Override
    public void close() {
WindowStore<Bytes, byte[]>> materialized);

KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer);
KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer,
                           super.close();
   final Materialized<K, V, WindowStore<Bytes,  mapper.close();
    }
}byte[]>> materialized);






  • Rich Interfaces

If the interface is value-only (like ValueJoiner, ValueMapper) we extend its rich interface from its withKey'ed counterpart. 

 

 

If the interface is key-value (KeyValueMapper) we extend its rich interface from itself. 

 

KGroupedTable

  

Code Block
languagejava
public interface RichFunction {
    void init(RecordContext recordContext);

 
KTable<K, V> reduce(final RichReducer<V, K> adder,
         void close();
}
 
public interface ValueMapperWithKey<K, V, VR> {

    VR apply(final K key, final V value);
}

 
public interface RichValueMapper<K, V, VR>  extends ValueMapperWithKey<K, V, VR>, RichFunction {

}
 
public interface RichKeyValueMapper<K, V, VR>  extends KeyValueMapper<K, V, VR>, RichFunction {

}
 

 

The same semantics apply to other interfaces as well. 

 

So we don't need to add any overloaded methods for public APIs. Internally we perform 2 changes:

  1. Change the constructor type of all related Processors to accept rich interfaces
  2. Create converters from non-rich to rich interfaces

 

Code Block
languagejava
class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {

    private final RichValueMapper<K, V, V1> mapper;

    public KStreamMapValues(RichValueMapper<K, V, V1> mapper) {
RichReducer<V, K> subtractor,
                    final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

KTable<K, V> reduce(final RichReducer<V, K> adder,
                    final RichReducer<V, K> subtractor);

<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
                             final RichAggregator<? this.mapper = mapper;
    }

super K, ? super V, VR> adder,
     @Override
    public Processor<K, V> get() {
        return new KStreamMapProcessor();
    }

  final RichAggregator<? privatesuper classK, KStreamMapProcessor? extendssuper AbstractProcessor<KV, V>VR> {subtractor,
         @Override
        public void init(ProcessorContext processorContext) {
        final Materialized<K, VR,  super.init(processorContextKeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
       mapper.init((RecordContext) processorContext);
        }

        @Override
     final   public void process(final K key, final V value) {
   RichAggregator<? super K, ? super V, VR> adder,
         V1 newValue = mapper.apply(key, value);
            context().forward(key, newValue);
    final RichAggregator<? super K, }

? super V, VR> subtractor);

 

 

 

 

 

 

Proposed changes

 

 

  • Move RecordContext from  .processor.internals  to  .processor 

 

  • Make record context open to public

Currently we set record context through InternalProcessorContext (StreamTask.updateProcessorContext()) :
Code Block
languagejava
// the below code @Override
snippet already exists, this is just for background. 
private public void closeupdateProcessorContext()final {
StampedRecord record, final ProcessorNode currNode) {
    processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp,  super.close();
            mapper.close();
        }record.offset(), record.partition(), record.topic()));
    }processorContext.setCurrentNode(currNode);
}

 

...

 

 

Sample processor should look like this:

 

Code Block
languagejava
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {

    ...
    private RecordContext recordContext;       // this line is added in this KIP

    ...

    @Override
	public void process(final K1 key, final V1 value) {

    	recordContext = new RecordContext() {      
 
 
 
static <K, T1, T2, R> RichValueJoiner<K, T1, T2, R> convertToRichValueJoiner(final ValueJoinerWithKey<K, T1, T2, R> valueJoinerWithKey) {
    Objects.requireNonNull(valueJoinerWithKey, "valueJoiner can't be null");
    if (valueJoinerWithKey instanceof RichValueJoiner) {
        return (RichValueJoiner<K, T1, T2, R>) valueJoinerWithKey;
    } else {
        return new RichValueJoiner<K, T1, T2, R>() {
         // recordContext initialization @Override
is added in this KIP

    		@Override
    		public voidlong initoffset() {}

            @Override		return context().recordContext().offset();
    		}

    		@Override
    		public voidlong closetimestamp() {}

        		return context().recordContext().timestamp();
    		}

     		@Override
            		public RString apply(K key, T1 value1, T2 value2topic() {
                return valueJoinerWithKey.apply(key, value1, value2		return context().recordContext().topic();
            		}

    		@Override
    };
    }
}

static <K, T1, T2, R> ValueJoinerWithKey<K, T1, T2, R> convertToValueJoinerWithKey(final ValueJoiner<T1, T2, R> valueJoiner) {
    Objects.requireNonNull(valueJoiner, "valueJoiner can't be null");
    return new ValueJoinerWithKey<K, T1, T2, R>(		public int partition() {
        		return context().recordContext().partition();
    		}
      };

	
 
    if (key != null && value != null) {
        @Override
final V2 value2 = valueGetter.get(keyMapper.apply(key, value));
   public  R apply(K key, T1if value1, T2(leftJoin || value2 != null) {
            return valueJoinercontext().forward(key, joiner.apply(value1value, value2, recordContext));    
        }
    };
}




}

 

Rejected Alternatives

Not yet.