...
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params)
, close()
methods are needed.
Public Interfaces
KStream.java
Code Block | ||
---|---|---|
| ||
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);
KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);
<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> mapper);
<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor);
void foreach(final RichForeachAction<? super K, ? super V> action);
KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action);
KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates);
<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector);
<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector,
final Serde<KR> keySerde,
final Serde<V> valSerde);
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows);
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> thisValueSerde,
final Serde<VO> otherValueSerde);
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows);
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> thisValSerde,
final Serde<VO> otherValueSerde);
<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows);
<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final JoinWindows windows,
final Serde<K> keySerde,
final Serde<V> thisValueSerde,
final Serde<VO> otherValueSerde);
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner);
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,
final Serde<K> keySerde,
final Serde<V> valSerde);
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner);
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner,
final Serde<K> keySerde,
final Serde<V> valSerde);
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
final RichValueJoiner<? super V, ? super GV, ? extends RV> joiner);
<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
final RichValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner);
|
KTable.java
Code Block | ||
---|---|---|
| ||
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate);
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName);
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate);
KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier);
KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName);
<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper);
<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName);
<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapper,
final Serde<VR> valueSerde,
final StateStoreSupplier<KeyValueStore> storeSupplier);
void foreach(final RichForeachAction<? super K, ? super V> action);
<KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);
<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
final Serde<KR> keySerde,
final Serde<VR> valueSerde);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Serde<VR> joinSerde,
final String queryableStoreName);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final StateStoreSupplier<KeyValueStore> storeSupplier);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Serde<VR> joinSerde,
final String queryableStoreName);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final StateStoreSupplier<KeyValueStore> storeSupplier);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final Serde<VR> joinSerde,
final String queryableStoreName);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner,
final StateStoreSupplier<KeyValueStore> storeSupplier);
|
KGroupedStream
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP combines KIP-149 and provides a hybrid solution to rich functions in Streams and accessing read-only keys within ValueJoiner, ValueTransformer, ValueMapper interfaces.
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods. For example:
- having access to RecordContext within an operator
- having access to a read-only key for ValueJoiner, ValueTransformer, ValueMapper interfaces
Rich Interfaces
Code Block | ||
---|---|---|
| ||
public interface RichInitializer<V, K> {
V apply(K key);
}
public interface RichValueMapper<V, VR, K> {
VR apply(final V value, final K key, final RecordContext recordContext);
}
public interface RichValueJoiner<V1, V2, VR, K> {
VR apply(final V1 value1, final V2 value2, final K key, final RecordContext recordContext);
}
public interface RichKeyValueMapper<K, V, VR> {
VR apply(final K key, final V value, final RecordContext recordContext);
}
public interface RichReducer<V, K> {
V apply(final V value1, final V value2, final K key, final RecordContext recordContext);
}
public interface RichAggregator<K, V, VA> {
VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
public interface RichForeachAction<K, V> {
void apply(final K key, final V value, final RecordContext recordContext);
}
public interface RichPredicate<K, V> {
boolean test(final K key, final V value, final RecordContext recordContext);
}
public interface RichMerger<K, V> {
V apply(final K aggKey, final V aggOne, final V aggTwo, final RecordContext recordContext);
}
public interface RichValueTransformer<V, VR, K> {
void init(final ProcessorContext context);
VR transform(final V value, final K key);
void close();
}
public interface RichValueTransformerSupplier<V, VR, K> {
RichValueTransformer<V, VR, K> get();
}
|
Public Interfaces
KStream
Code Block | ||
---|---|---|
| ||
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);
KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);
<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR, ? super K> mapper);
<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper< | ||
Code Block | ||
| ||
KTable<K, V> reduce(final RichReducer<V> reducer); KTable<K, V> reduce(final RichReducer<V> reducer, final String queryableStoreName); KTable<K, V> reduce(final RichReducer<V> reducer, final StateStoreSupplier<KeyValueStore> storeSupplier); <W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final Windows<W> windows, final String queryableStoreName); <W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final Windows<W> windows); <W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows, final String queryableStoreName); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final Serde<VR> aggValueSerde, final String queryableStoreName); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final Serde<VR> aggValueSerde, final String queryableStoreName); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final Serde<VR> aggValueSerde, final String queryableStoreName); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR>? aggregator, extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper); <VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>, ? super K> mapper); void foreach(final RichForeachAction<? super K, ? super V> action); KStream<K, V> peek(final RichForeachAction<? super K, ? super final Windows<W> windows, V> action); KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates); <VR> KStream<K, VR> transformValues(final RichValueTransformerSupplier<? super V, ? extends VR, ? super K> valueTransformerSupplier, final String... stateStoreNames); <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, final StateStoreSupplier<WindowStore> storeSupplierKR> selector); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector, final Serialized<KR, V> serialized); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final RichAggregator<? super K, ? super V, VR> aggregator, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner, final JoinWindows windows); <VO, VR> KStream<K, VR> join(final KStream<K, Windows<W>VO> windowsotherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner, final StateStoreSupplier<WindowStore> storeSupplier); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final JoinWindows windows, final Aggregator<? super K, ? superJoined<K, V, T> aggregator, VO> joined); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final Merger<? super K, T> sessionMerger, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner, final SessionWindows sessionWindows, final JoinWindows windows); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final Serde<T> aggValueSerde, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner, final String queryableStoreName); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final JoinWindows windows, final RichAggregator<? super K, ? super V, T> aggregator, final Joined<K, V, VO> joined); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final Merger<? super K, T> sessionMerger, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner, final SessionWindows sessionWindows, final JoinWindows windows); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final Serde<T> aggValueSerde, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? finalsuper String queryableStoreName); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, K> joiner, final JoinWindows windows, final RichAggregator<? super K, ? super V, T> aggregator, final Joined<K, V, VO> joined); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final Merger<? super K, T> sessionMerger, final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner); <VT, VR> KStream<K, VR> join(final finalKTable<K, SessionWindowsVT> sessionWindowstable, final RichValueJoiner<? super K, ? super V, ? finalsuper Serde<T> aggValueSerdeVT, ? extends VR> joiner, final Joined<K, V, VT> final String queryableStoreNamejoined); <T> KTable<Windowed<K><VT, VR> KStream<K, T>VR> aggregateleftJoin(final KTable<K, RichInitializer<T>VT> initializertable, final Aggregator<RichValueJoiner<? super K, ? super V, ? T>super aggregatorVT, ? extends VR> joiner); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final Merger<? super K, T> sessionMerger, final RichValueJoiner<? super K, ? super V, ? super final SessionWindows sessionWindowsVT, ? extends VR> joiner, final Joined<K, V, VT> final Serde<T> aggValueSerdejoined); <T> KTable<Windowed<K> <GK, GV, RV> KStream<K, T>RV> aggregatejoin(final GlobalKTable<GK, Initializer<T>GV> initializerglobalKTable, final RichAggregator<RichKeyValueMapper<? super K, ? super V, ? extends T>GK> aggregatorkeyValueMapper, final Merger<RichValueJoiner<? super K, ? T>super sessionMergerV, ? final SessionWindows sessionWindowssuper GV, ? extends RV> joiner); <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable, final Serde<T> aggValueSerde); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final RichAggregator<RichValueJoiner<? super K, ? super V, ? T>super aggregatorGV, ? extends final Merger<RV> valueJoiner); |
KTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate); KTable<K, V> filter(final RichPredicate<? super K, ? super T>V> sessionMergerpredicate, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate); KTable<K, V> filterNot(final SessionWindows sessionWindows, RichPredicate<? super K, ? super V> predicate, final Materialized<K, V, final Serde<T> aggValueSerdeKeyValueStore<Bytes, byte[]>> materialized); <T><VR> KTable<Windowed<K>KTable<K, T>VR> aggregatemapValues(final RichValueMapper<? Initializer<T>super initializerV, ? extends VR, ? super K> mapper); <VR> KTable<K, VR> mapValues(final RichValueMapper<? final RichAggregator<? super Ksuper V, ? extends VR, ? super V, T>K> aggregatormapper, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <KR> KStream<KR, V> toStream(final Merger<RichKeyValueMapper<? super K, ? T>super sessionMergerV, ? extends KR> mapper); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, final SessionWindows sessionWindows, final Serialized<KR, VR> serialized); <VO, VR> KTable<K, VR> join(final KTable<K, Serde<T>VO> aggValueSerdeother, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? finalsuper StateStoreSupplier<SessionStore>K> storeSupplierjoiner); <T> KTable<Windowed<K><VO, VR> KTable<K, T>VR> aggregatejoin(final KTable<K, RichInitializer<T>VO> initializerother, final RichValueJoiner<? super V, ? final Aggregator<super VO, ? superextends KVR, ? super V, T> aggregatorK> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, VR> leftJoin(final Merger<?KTable<K, superVO> K, T> sessionMerger, other, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, SessionWindowsVO> sessionWindowsother, final ValueJoiner<? super K, ? super V, ? super VO, ? finalextends Serde<T>VR> aggValueSerdejoiner, final Materialized<K, VR, KeyValueStore<Bytes, final StateStoreSupplier<SessionStore> storeSupplierbyte[]>> materialized); <T> KTable<Windowed<K><VO, VR> KTable<K, T>VR> aggregateouterJoin(final KTable<K, RichInitializer<T>VO> initializerother, final RichValueJoiner<? super V, ? super finalVO, RichAggregator<? superextends KVR, ? super V K> joiner); <VO, T>VR> aggregatorKTable<K, VR> outerJoin(final KTable<K, VO> other, final Merger<RichValueJoiner<? super KV, ? T>super sessionMergerVO, ? extends VR, ? super K> joiner, final SessionWindows sessionWindows, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); |
KGroupedStream
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V, K> reducer); KTable<K, V> reduce(final RichReducer<V, K> reducer, final Materialized<K, Serde<T>V, aggValueSerdeKeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final RichInitializer<VR, K> initializer, final RichAggregator<? StateStoreSupplier<SessionStore> storeSupplier); |
KGroupedTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V> addersuper K, ? super V, VR> aggregator, final Reducer<V> subtractor, final Materialized<K, final String queryableStoreNameVR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, V>VR> reduceaggregate(final RichInitializer<VR, Reducer<V>K> adderinitializer, final RichReducer<V> subtractor, final RichAggregator<? super K, ? super V, VR> aggregator); |
SessionWindowedKStream
There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces.
Code Block | ||
---|---|---|
| ||
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T, Windowed<K>> initializer, final String queryableStoreName); KTable<K, V> reduce(final RichReducer<V> adder, final RichReducer<V> subtractor, final String queryableStoreName); KTable<K, V> reduce(final RichReducer<V> adder, final Reducer<V> subtractor); KTable<K, V> reduce(final RichAggregator<? Reducer<V>super adderK, ? super V, T> aggregator, final RichReducer<V> subtractor); KTable<K, V> reduce(final RichReducer<V> adder, final RichMerger<? super K, finalT> RichReducer<V> subtractorsessionMerger); KTable<K<VR> KTable<Windowed<K>, V>VR> reduceaggregate(final RichInitializer<VR, RichReducer<V>Windowed<K>> adderinitializer, final Reducer<V> subtractor, final StateStoreSupplier<KeyValueStore> storeSupplier); KTable<K, V> reduce(final RichAggregator<? Reducer<V>super adder, K, ? super V, VR> aggregator, final RichReducer<V> subtractor, final RichMerger<? StateStoreSupplier<KeyValueStore> storeSupplier); KTable<Ksuper K, V> reduce(final RichReducer<V> adderVR> sessionMerger, final RichReducer<V> subtractor, final Materialized<K, VR, final StateStoreSupplier<KeyValueStore> storeSupplierSessionStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializerKTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer); KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer, final RichAggregator<? super K, ? super V, VR> adder, Materialized<K, V, SessionStore<Bytes, byte[]>> materializedAs); , |
TimeWindowedKStream
Code Block | ||
---|---|---|
| ||
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> initializer, final RichAggregator<? super K, ? super V, VR> subtractor, final RichAggregator<? super K, ? super finalV, StringVR> queryableStoreNameaggregator); <VR> KTable<KKTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, RichInitializer<VR>K> initializer, final RichAggregator<? super K, ? super V, VR> adderaggregator, final RichAggregator<? super K, ? super V, VR> subtractor); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized); KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer); KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer, final RichAggregator<? super K, ? superMaterialized<K, V, VR> adderWindowStore<Bytes, byte[]>> materialized); |
KGroupedTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V, K> adder, final RichAggregator<? super KRichReducer<V, ? super V, VR>K> subtractor, final Serde<VR>Materialized<K, aggValueSerdeV, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> reduce(final RichReducer<V, K> adder, final finalRichReducer<V, StringK> queryableStoreNamesubtractor); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor, final Serde<VR> aggValueSerde); Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor, final StateStoreSupplier<KeyValueStore> storeSupplier); |
Proposed changes
Limiting the ProcessorContext - RecordContext interface
...
Move
RecordContext
from.
processor.internals
to.processor
Make record context open to public
StreamTask.updateProcessorContext()
) :Code Block | ||
---|---|---|
| ||
public interface RecordContext { String applicationId(); TaskId taskId(); StreamsMetrics metrics(); String topic(); int partition(); void commit(); long offset(); long timestamp(); Map<String, Object> appConfigs(// the below code snippet already exists, this is just for background. private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) { processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic())); Map<String, Object> appConfigsWithPrefix(String prefixprocessorContext.setCurrentNode(currNode); } |
Sample processor should look like this:
Once we need a conversion from ProcessorContext
and RecordContext, we just cast:
Code Block | ||
---|---|---|
| ||
private class KStreamMapProcessorKStreamKTableJoinProcessor<K1, extendsK2, AbstractProcessor<KV1, V>V2, { R> extends AbstractProcessor<K1, V1> @Override{ public void init(ProcessorContext processorContext) {... private super.init(processorContext); RecordContext recordContext; richMapper.init((RecordContext) processorContext); // this line HEREis WEadded MAKEin Athis CASTKIP }... @Override public void process(final KK1 key, final VV1 value) { recordContext = new RecordContext() { // recordContext initialization is added in this KIP @Override V1 newValue public =long mapper.apply(key, value); offset() { return context().recordContext().forwardoffset(key, newValue); } @Override public voidlong closetimestamp() { super.close return context().recordContext().timestamp(); } mapper.close(); @Override } } |
Rich Interfaces
Code Block | ||
---|---|---|
| ||
public interface RichValueMapper<V, VR> public String topic() { VR apply(final V value, final RecordContext recordContext return context().recordContext().topic(); } public interface RichValueJoiner<V1, V2, VR> { @Override public VRint apply(final V1 value1, final V2 value2, final RecordContext recordContext); } public interface RichKeyValueMapper<K, V, VR> { VR apply(final K key, final V value, final RecordContext recordContext); } public interface RichReducer<V> {partition() { return context().recordContext().partition(); } }; Vif apply(finalkey V!= value1,null final&& Vvalue value2, final RecordContext recordContext); } public interface RichInitializer<VA> { != null) { VA apply(final RecordContext recordContext); } public interface RichAggregator<K, V, VA> { VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext); } public interface RichForeachAction<K, V> { void apply(final K key, final V value, final RecordContext recordContext); } public interface RichPredicate<K, V> { boolean test(final K key, final V value, final RecordContext recordContext); } V2 value2 = valueGetter.get(keyMapper.apply(key, value)); if (leftJoin || value2 != null) { context().forward(key, joiner.apply(value, value2, recordContext)); } } } } |
Rejected Alternatives
Not yet.
...