You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: "Under Discussion"

Discussion thread: TBD

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params), close() methods are needed.

Public Interfaces

 

 

  • KStream.java

 

KStream<K, V> filter(Predicate<? super K, ? super V> predicate, final RecordContext recordContext);
 
KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate, final RecordContext recordContext);

<KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper, final RecordContext recordContext);

<VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, final RecordContext recordContext);

<KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper, final RecordContext recordContext);

<VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> processor);<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows, final RecordContext recordContext);

<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                             final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                             final JoinWindows windows,
                             final Serde<K> keySerde,
                             final Serde<V> thisValueSerde,
                             final Serde<VO> otherValueSerde, 
                             final RecordContext recordContext) 



<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows, 
								 final RecordContext recordContext);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final JoinWindows windows,
                                 final Serde<K> keySerde,
                                 final Serde<V> thisValSerde,
                                 final Serde<VO> otherValueSerde, 
								 final RecordContext recordContext);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows, 
								  final RecordContext recordContext);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                  final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                  final JoinWindows windows,
                                  final Serde<K> keySerde,
                                  final Serde<V> thisValueSerde,
                                  final Serde<VO> otherValueSerde, 
								  final RecordContext recordContext);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final RecordContext recordContext);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                             final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                             final Serde<K> keySerde,
                             final Serde<V> valSerde, 
							 final RecordContext recordContext);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final RecordContext recordContext);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
                                 final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                 final Serde<K> keySerde,
                                 final Serde<V> valSerde,
							     final RecordContext recordContext);

<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
                                 final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                 final ValueJoiner<? super V, ? super GV, ? extends RV> joiner,
								 final RecordContext recordContext);

<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                                     final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
                                     final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner, 
									 final RecordContext recordContext);

 

    • KTable.java

 

KTable<K, V> filter(final Predicate<? super K, ? super V> predicate);


KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);


KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate);


KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);


KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);


<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);


<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
                             final Serde<VR> valueSerde,
                             final StateStoreSupplier<KeyValueStore> storeSupplier);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final Serde<VR> joinSerde,
                            final String queryableStoreName);


<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final StateStoreSupplier<KeyValueStore> storeSupplier);


<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);


<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final Serde<VR> joinSerde,
                                final String queryableStoreName);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final StateStoreSupplier<KeyValueStore> storeSupplier);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final Serde<VR> joinSerde,
                                 final String queryableStoreName);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final StateStoreSupplier<KeyValueStore> storeSupplier);
  • Limiting the ProcessorContext - RecordContext interface

We create a subset of features from ProcessorContext and put into RecordContext interface

public interface RecordContext {
    String applicationId();
    TaskId taskId();
    StreamsMetrics metrics();
    String topic();
    int partition();
    long offset();
    long timestamp();
    Map<String, Object> appConfigs();
    Map<String, Object> appConfigsWithPrefix(String prefix);
}
 
 
public interface ProcessorContext extends RecordContext {
   // all methods but the ones in RecordContext
}

 

Once we need a conversion from ProcessorContext and RecordContext, we just cast: 

private class KStreamMapProcessor extends AbstractProcessor<K, V> {
    @Override
    public void init(ProcessorContext processorContext) {
        super.init(processorContext);
        richMapper.init((RecordContext) processorContext);  				// here make a cast
    }

    @Override
    public void process(final K key, final V value) {
        V1 newValue = mapper.apply(key, value);
        context().forward(key, newValue);
    }

    @Override
    public void close() {
        super.close();
        mapper.close();
    }
}

  • Rich Interfaces

If the interface is value-only (like ValueJoiner, ValueMapper) we extend its rich interface from its withKey'ed counterpart. 

If the interface is key-value (KeyValueMapper) we extend its rich interface from itself. 

 

public interface RichFunction {
    void init(RecordContext recordContext);

    void close();
}
 
public interface ValueMapperWithKey<K, V, VR> {

    VR apply(final K key, final V value);
}

 
public interface RichValueMapper<K, V, VR>  extends ValueMapperWithKey<K, V, VR>, RichFunction {

}
 
public interface RichKeyValueMapper<K, V, VR>  extends KeyValueMapper<K, V, VR>, RichFunction {

}
 

 

The same semantics apply to other interfaces as well. 

 

So we don't need to add any overloaded methods for public APIs. Internally we perform 2 changes:

  1. Change the constructor type of all related Processors to accept rich interfaces
  2. Create converters from non-rich to rich interfaces

 

class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> {

    private final RichValueMapper<K, V, V1> mapper;

    public KStreamMapValues(RichValueMapper<K, V, V1> mapper) {
        this.mapper = mapper;
    }

    @Override
    public Processor<K, V> get() {
        return new KStreamMapProcessor();
    }

    private class KStreamMapProcessor extends AbstractProcessor<K, V> {
        @Override
        public void init(ProcessorContext processorContext) {
            super.init(processorContext);
            mapper.init((RecordContext) processorContext);
        }

        @Override
        public void process(final K key, final V value) {
            V1 newValue = mapper.apply(key, value);
            context().forward(key, newValue);
        }

        @Override
        public void close() {
            super.close();
            mapper.close();
        }
    }
}
 
 
 
 
 
static <K, T1, T2, R> RichValueJoiner<K, T1, T2, R> convertToRichValueJoiner(final ValueJoinerWithKey<K, T1, T2, R> valueJoinerWithKey) {
    Objects.requireNonNull(valueJoinerWithKey, "valueJoiner can't be null");
    if (valueJoinerWithKey instanceof RichValueJoiner) {
        return (RichValueJoiner<K, T1, T2, R>) valueJoinerWithKey;
    } else {
        return new RichValueJoiner<K, T1, T2, R>() {
            @Override
            public void init() {}

            @Override
            public void close() {}

            @Override
            public R apply(K key, T1 value1, T2 value2) {
                return valueJoinerWithKey.apply(key, value1, value2);
            }
        };
    }
}

static <K, T1, T2, R> ValueJoinerWithKey<K, T1, T2, R> convertToValueJoinerWithKey(final ValueJoiner<T1, T2, R> valueJoiner) {
    Objects.requireNonNull(valueJoiner, "valueJoiner can't be null");
    return new ValueJoinerWithKey<K, T1, T2, R>() {
        @Override
        public R apply(K key, T1 value1, T2 value2) {
            return valueJoiner.apply(value1, value2);
        }
    };
}




Rejected Alternatives

Not yet.

  • No labels