Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params), close() methods are needed.

Public Interfaces

 

 

  • KStream.java

 

We create a subset of features from ProcessorContext and put into RecordContext interface

Code Block
languagejava
public interface RecordContext {
    String applicationId();
    TaskId taskId();
    StreamsMetrics metrics();
    String topic();
    int partition();
    long offset();
    long timestamp();
    Map<String, Object> appConfigs();
    Map<String, Object> appConfigsWithPrefix(String prefix);
}
 
 
public interface ProcessorContext extends RecordContext {
   // all methods but the ones in RecordContext
}KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);


KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);


<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);


<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);


<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> mapper);


<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);


<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor);


void foreach(final RichForeachAction<? super K, ? super V> action);


KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action);


KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates);


<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector);


<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector,
                                   final Serde<KR> keySerde,
                                   final Serde<V> valSerde);


<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows);


<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows,
                             final Serde<K> keySerde,
                             final Serde<V> thisValueSerde,
                             final Serde<VO> otherValueSerde);


<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows);


<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows,
                                 final Serde<K> keySerde,
                                 final Serde<V> thisValSerde,
                                 final Serde<VO> otherValueSerde);


<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows);


<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows,
                                  final Serde<K> keySerde,
                                  final Serde<V> thisValueSerde,
                                  final Serde<VO> otherValueSerde);


<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner);


<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                             final Serde<K> keySerde,
                             final Serde<V> valSerde);


<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner);


<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                 final Serde<K> keySerde,
                                 final Serde<V> valSerde);


<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
                                 final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                 final RichValueJoiner<? super V, ? super GV, ? extends RV> joiner);


<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                                     final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                     final RichValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner);

  • Limiting the ProcessorContext - RecordContext interface

...

Code Block
languagejava
private class KStreamMapProcessor extends AbstractProcessor<K, V> {
    @Override
    public void init(ProcessorContext processorContext) {
        super.init(processorContext);
        richMapper.init((RecordContext) processorContext);  				// hereHERE WE makeMAKE aA castCAST
    }

    @Override
    public void process(final K key, final V value) {
        V1 newValue = mapper.apply(key, value);
        context().forward(key, newValue);
    }

    @Override
    public void close() {
        super.close();
        mapper.close();
    }
}

...

Code Block
languagejava
public interface RichValueMapper<V, VR> {
    VR apply(final V value, final RecordContext recordContext);
}

public interface RichValueJoiner<V1, V2, VR> {
    VR apply(final V1 value1, final V2 value2, final RecordContext recordContext);
}

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContext recordContext);
}

public interface RichReducer<V> {
    V apply(final V value1, final V value2, final RecordContext recordContext);
}


public interface RichInitializer<VA> {
    VA apply(final RecordContext recordContext);
}


public interface Aggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
 
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final RecordContext recordContext);
}


public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final RecordContext recordContext);
}

...