Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params), close() methods are needed.

Public Interfaces





We create a subset of features from ProcessorContext and put into RecordContext interface

Code Block
public interface RecordContext {
    String applicationId();
    TaskId taskId();
    StreamsMetrics metrics();
    String topic();
    int partition();
    long offset();
    long timestamp();
    Map<String, Object> appConfigs();
    Map<String, Object> appConfigsWithPrefix(String prefix);
public interface ProcessorContext extends RecordContext {
   // all methods but the ones in RecordContext
}KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);

KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);

<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);

<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);

<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> mapper);

<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);

<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor);

void foreach(final RichForeachAction<? super K, ? super V> action);

KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action);

KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates);

<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector);

<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector,
                                   final Serde<KR> keySerde,
                                   final Serde<V> valSerde);

<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows);

<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows,
                             final Serde<K> keySerde,
                             final Serde<V> thisValueSerde,
                             final Serde<VO> otherValueSerde);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows,
                                 final Serde<K> keySerde,
                                 final Serde<V> thisValSerde,
                                 final Serde<VO> otherValueSerde);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows,
                                  final Serde<K> keySerde,
                                  final Serde<V> thisValueSerde,
                                  final Serde<VO> otherValueSerde);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                             final Serde<K> keySerde,
                             final Serde<V> valSerde);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                 final Serde<K> keySerde,
                                 final Serde<V> valSerde);

<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
                                 final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                 final RichValueJoiner<? super V, ? super GV, ? extends RV> joiner);

<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                                     final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                     final RichValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner);

  • Limiting the ProcessorContext - RecordContext interface


Code Block
private class KStreamMapProcessor extends AbstractProcessor<K, V> {
    public void init(ProcessorContext processorContext) {
        richMapper.init((RecordContext) processorContext);  				// hereHERE WE makeMAKE aA castCAST

    public void process(final K key, final V value) {
        V1 newValue = mapper.apply(key, value);
        context().forward(key, newValue);

    public void close() {


Code Block
public interface RichValueMapper<V, VR> {
    VR apply(final V value, final RecordContext recordContext);

public interface RichValueJoiner<V1, V2, VR> {
    VR apply(final V1 value1, final V2 value2, final RecordContext recordContext);

public interface RichKeyValueMapper<K, V, VR> {
    VR apply(final K key, final V value, final RecordContext recordContext);

public interface RichReducer<V> {
    V apply(final V value1, final V value2, final RecordContext recordContext);

public interface RichInitializer<VA> {
    VA apply(final RecordContext recordContext);

public interface Aggregator<K, V, VA> {
    VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
public interface RichForeachAction<K, V> {
    void apply(final K key, final V value, final RecordContext recordContext);

public interface RichPredicate<K, V> {
    boolean test(final K key, final V value, final RecordContext recordContext);
