THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params)
, close()
methods are needed.
Public Interfaces
Proposed Changes
KStream.java
There 2 main issues to consider while introducing rich functions: 1. Limiting the ProcessorContext for init(Some param) method and introducing rich functions in a backwards compatible way.
Code Block | ||
---|---|---|
| ||
KStream<K, V> filter(Predicate<? super K, ? super V> predicate, final RecordContext recordContext);
KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate, final RecordContext recordContext);
<KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper, final RecordContext recordContext);
<VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, final RecordContext recordContext);
<KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper, final RecordContext recordContext);
<VR> KStream<K, VR> flatMapValues(final ValueMapper<? super V, ? extends Iterable<? extends VR>> processor);<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows, final RecordContext recordContext);
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> thisValueSerde,
final Serde<VO> otherValueSerde,
final RecordContext recordContext)
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final RecordContext recordContext);
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> thisValSerde,
final Serde<VO> otherValueSerde,
final RecordContext recordContext);
<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final RecordContext recordContext);
<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> thisValueSerde,
final Serde<VO> otherValueSerde,
final RecordContext recordContext);
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final RecordContext recordContext);
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
final Serde<K> keySerde,
final Serde<V> valSerde,
final RecordContext recordContext);
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final RecordContext recordContext);
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
final Serde<K> keySerde,
final Serde<V> valSerde,
final RecordContext recordContext);
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
final ValueJoiner<? super V, ? super GV, ? extends RV> joiner,
final RecordContext recordContext);
<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
final KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner,
final RecordContext recordContext);
|
KTable.java
Code Block | ||
---|---|---|
| ||
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate);
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName);
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);
<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper,
final Serde<VR> valueSerde,
final StateStoreSupplier<KeyValueStore> storeSupplier);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Serde<VR> joinSerde,
final String queryableStoreName);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final StateStoreSupplier<KeyValueStore> storeSupplier);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Serde<VR> joinSerde,
final String queryableStoreName);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final StateStoreSupplier<KeyValueStore> storeSupplier);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Serde<VR> joinSerde,
final String queryableStoreName);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final StateStoreSupplier<KeyValueStore> storeSupplier); |
Limiting the ProcessorContext - RecordContext interface
...