Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params), close() methods are needed.

Public Interfaces

We assume to build this KIP on top of KIP-149, meaning, we build rich functions on top of "withKey" interfaces. So, there is no public interface change apart from the ones shown in KIP-149.

The reason is that, once withKey interfaces are part of public API, we will not need any overloaded methods for rich functions. 

Moreover, we separated onlyValue (ValueMapper) and withKey (ValueMapperWithKey) interfaces (no inheritance relation) to enable lambdas. Therefore, while introducing the rich functios, we have to chose one of them to extend.

It is better to select the more general interface (withKey in this case) for rich functions.  


Proposed Changes

There 2 main issues to consider while introducing rich functions: 1. Limiting the ProcessorContext for init(Some param) method and introducing rich functions in a backwards compatible way.

Code Block
languagejava
KStream<K, V> filter(Predicate<? super K, ? super V> predicate, final RecordContext recordContext);
 
KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate, final RecordContext recordContext);

<KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper, final RecordContext recordContext);

<VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, final RecordContext recordContext);

<KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper, final RecordContext recordContext);

<VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> processor);<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows, final RecordContext recordContext);

<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows,
                             final Serde<K> keySerde,
                             final Serde<V> thisValueSerde,
                             final Serde<VO> otherValueSerde, 
                             final RecordContext recordContext) 



<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows, 
								 final RecordContext recordContext);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows,
                                 final Serde<K> keySerde,
                                 final Serde<V> thisValSerde,
                                 final Serde<VO> otherValueSerde, 
								 final RecordContext recordContext);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows, 
								  final RecordContext recordContext);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows,
                                  final Serde<K> keySerde,
                                  final Serde<V> thisValueSerde,
                                  final Serde<VO> otherValueSerde, 
								  final RecordContext recordContext);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final RecordContext recordContext);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                             final Serde<K> keySerde,
                             final Serde<V> valSerde, 
							 final RecordContext recordContext);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final RecordContext recordContext);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                 final Serde<K> keySerde,
                                 final Serde<V> valSerde,
							     final RecordContext recordContext);

<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
                                 final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                 final ValueJoiner<? super V, ? super GV, ? extends RV> joiner,
								 final RecordContext recordContext);

<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                                     final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                     final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner, 
									 final RecordContext recordContext);





















  • Limiting the ProcessorContext - RecordContext interface

...

Code Block
languagejava
private class KStreamMapProcessor extends AbstractProcessor<K, V> {
    @Override
    public void init(ProcessorContext processorContext) {
        super.init(processorContext);
        richMapper.init((RecordContext) processorContext);  				// here make a cast
    }

    @Override
    public void process(final K key, final V value) {
        V1 newValue = mapper.apply(key, value);
        context().forward(key, newValue);
    }

    @Override
    public void close() {
        super.close();
        mapper.close();
    }
}

...