Status
Current state: "Under Discussion"
Discussion thread: TBD
JIRA: TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params)
, close()
methods are needed.
Public Interfaces
KStream.java
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate); KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate); <KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper); <KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper); <VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> mapper); <KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper); <VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor); void foreach(final RichForeachAction<? super K, ? super V> action); KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action); KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates); <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector); <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector, final Serde<KR> keySerde, final Serde<V> valSerde); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final Serde<K> keySerde, final Serde<V> thisValueSerde, final Serde<VO> otherValueSerde); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final Serde<K> keySerde, final Serde<V> thisValSerde, final Serde<VO> otherValueSerde); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final Serde<K> keySerde, final Serde<V> thisValueSerde, final Serde<VO> otherValueSerde); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Serde<K> keySerde, final Serde<V> valSerde); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Serde<K> keySerde, final Serde<V> valSerde); <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable, final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final RichValueJoiner<? super V, ? super GV, ? extends RV> joiner); <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable, final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final RichValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner);
KTable.java
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate); KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName); KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier); KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate); KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier); KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName); <VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper); <VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName); <VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final StateStoreSupplier<KeyValueStore> storeSupplier); void foreach(final RichForeachAction<? super K, ? super V> action); <KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, final Serde<KR> keySerde, final Serde<VR> valueSerde); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Serde<VR> joinSerde, final String queryableStoreName); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final StateStoreSupplier<KeyValueStore> storeSupplier); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Serde<VR> joinSerde, final String queryableStoreName); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final StateStoreSupplier<KeyValueStore> storeSupplier); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Serde<VR> joinSerde, final String queryableStoreName); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final StateStoreSupplier<KeyValueStore> storeSupplier);
KGroupedStream
KTable<K, V> reduce(final RichReducer<V> reducer); KTable<K, V> reduce(final RichReducer<V> reducer, final String queryableStoreName); KTable<K, V> reduce(final RichReducer<V> reducer, final StateStoreSupplier<KeyValueStore> storeSupplier); <W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final Windows<W> windows, final String queryableStoreName); <W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final Windows<W> windows); <W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows, final String queryableStoreName); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final Serde<VR> aggValueSerde, final String queryableStoreName); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final Serde<VR> aggValueSerde, final String queryableStoreName); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final Serde<VR> aggValueSerde, final String queryableStoreName); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final String queryableStoreName); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final String queryableStoreName); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final String queryableStoreName); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final StateStoreSupplier<SessionStore> storeSupplier); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final StateStoreSupplier<SessionStore> storeSupplier); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final StateStoreSupplier<SessionStore> storeSupplier);
Limiting the ProcessorContext - RecordContext interface
We create a subset of features from ProcessorContext
and put into RecordContext
interface
public interface RecordContext { String applicationId(); TaskId taskId(); StreamsMetrics metrics(); String topic(); int partition(); void commit(); long offset(); long timestamp(); Map<String, Object> appConfigs(); Map<String, Object> appConfigsWithPrefix(String prefix); } public interface ProcessorContext extends RecordContext { // all methods but the ones in RecordContext }
Once we need a conversion from ProcessorContext
and RecordContext, we just cast:
private class KStreamMapProcessor extends AbstractProcessor<K, V> { @Override public void init(ProcessorContext processorContext) { super.init(processorContext); richMapper.init((RecordContext) processorContext); // HERE WE MAKE A CAST } @Override public void process(final K key, final V value) { V1 newValue = mapper.apply(key, value); context().forward(key, newValue); } @Override public void close() { super.close(); mapper.close(); } }
Rich Interfaces
public interface RichValueMapper<V, VR> { VR apply(final V value, final RecordContext recordContext); } public interface RichValueJoiner<V1, V2, VR> { VR apply(final V1 value1, final V2 value2, final RecordContext recordContext); } public interface RichKeyValueMapper<K, V, VR> { VR apply(final K key, final V value, final RecordContext recordContext); } public interface RichReducer<V> { V apply(final V value1, final V value2, final RecordContext recordContext); } public interface RichInitializer<VA> { VA apply(final RecordContext recordContext); } public interface Aggregator<K, V, VA> { VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext); } public interface RichForeachAction<K, V> { void apply(final K key, final V value, final RecordContext recordContext); } public interface RichPredicate<K, V> { boolean test(final K key, final V value, final RecordContext recordContext); }
The same semantics apply to other interfaces as well.
So we don't need to add any overloaded methods for public APIs. Internally we perform 2 changes:
- Change the constructor type of all related Processors to accept rich interfaces
- Create converters from non-rich to rich interfaces
class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> { private final RichValueMapper<K, V, V1> mapper; public KStreamMapValues(RichValueMapper<K, V, V1> mapper) { this.mapper = mapper; } @Override public Processor<K, V> get() { return new KStreamMapProcessor(); } private class KStreamMapProcessor extends AbstractProcessor<K, V> { @Override public void init(ProcessorContext processorContext) { super.init(processorContext); mapper.init((RecordContext) processorContext); } @Override public void process(final K key, final V value) { V1 newValue = mapper.apply(key, value); context().forward(key, newValue); } @Override public void close() { super.close(); mapper.close(); } } } static <K, T1, T2, R> RichValueJoiner<K, T1, T2, R> convertToRichValueJoiner(final ValueJoinerWithKey<K, T1, T2, R> valueJoinerWithKey) { Objects.requireNonNull(valueJoinerWithKey, "valueJoiner can't be null"); if (valueJoinerWithKey instanceof RichValueJoiner) { return (RichValueJoiner<K, T1, T2, R>) valueJoinerWithKey; } else { return new RichValueJoiner<K, T1, T2, R>() { @Override public void init() {} @Override public void close() {} @Override public R apply(K key, T1 value1, T2 value2) { return valueJoinerWithKey.apply(key, value1, value2); } }; } } static <K, T1, T2, R> ValueJoinerWithKey<K, T1, T2, R> convertToValueJoinerWithKey(final ValueJoiner<T1, T2, R> valueJoiner) { Objects.requireNonNull(valueJoiner, "valueJoiner can't be null"); return new ValueJoinerWithKey<K, T1, T2, R>() { @Override public R apply(K key, T1 value1, T2 value2) { return valueJoiner.apply(value1, value2); } }; }
Rejected Alternatives
Not yet.