...
Current state: "Under Discussion"
Discussion thread: TBD here
JIRA: TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params)
, close()
methods are needed.
Public Interfaces
KStream.java
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP combines KIP-149 and provides a hybrid solution to rich functions in Streams and accessing read-only keys within ValueJoiner, ValueTransformer, ValueMapper interfaces.
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods. For example:
- having access to RecordContext within an operator
- having access to a read-only key for ValueJoiner, ValueTransformer, ValueMapper interfaces
Rich Interfaces
Code Block | ||
---|---|---|
| ||
public interface RichInitializer<V, K> {
V apply(K key);
}
public interface RichValueMapper<V, VR, K> {
VR apply(final V value, final K key, final RecordContext recordContext);
}
public interface RichValueJoiner<V1, V2, VR, K> {
VR apply(final V1 value1, final V2 value2, final K key, final RecordContext recordContext);
}
public interface RichKeyValueMapper<K, V, VR> {
VR apply(final K key, final V value, final RecordContext recordContext);
}
public interface RichReducer<V, K> {
V apply(final V value1, final V value2, final K key, final RecordContext recordContext);
}
public interface RichAggregator<K, V, VA> {
VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
public interface RichForeachAction<K, V> {
void apply(final K key, final V value, final RecordContext recordContext);
}
public interface RichPredicate<K, V> {
boolean test(final K key, final V value, final RecordContext recordContext);
}
public interface RichMerger<K, V> {
V apply(final K aggKey, final V aggOne, final V aggTwo, final RecordContext recordContext);
}
public interface RichValueTransformer<V, VR, K> {
void init(final ProcessorContext context);
VR transform(final V value, final K key);
void close();
}
public interface RichValueTransformerSupplier<V, VR, K> {
RichValueTransformer<V, VR, K> get();
}
|
Public Interfaces
KStream
Code Block | ||
---|---|---|
| ||
KStream<K, V> filter(RichPredicate< | ||
Code Block | ||
| ||
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate); KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate); <KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper); <KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper); <VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR> mapper); <KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper); <VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>> processor); void foreach(final RichForeachAction<? super K, ? super V> action); KStream<K, V> peek(final RichForeachAction<? super K, ? super V> actionpredicate); KStream<K, V>[] branchfilterNot(final RichPredicate<? super K, ? super V>... predicatespredicate); <KR> KGroupedStream<KRKStream<KR, V> groupByselectKey(final RichKeyValueMapper<? super K, ? super V, ? extends KR> selectormapper); <KR> KGroupedStream<KR<KR, VR> KStream<KR, V>VR> groupBymap(final RichKeyValueMapper<? super K, ? super V, KR>? selector, extends KeyValue<? extends KR, ? extends final Serde<KR> keySerde, final Serde<V> valSerde); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final RichValueJoiner<VR>> mapper); <VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR, ? super K> mapper); <KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper); <VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>, ? super K> mapper); void foreach(final RichForeachAction<? super K, ? super V> action); KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action); KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates); <VR> KStream<K, VR> transformValues(final RichValueTransformerSupplier<? super V, ? superextends VOVR, ? extendssuper VR>K> joinervalueTransformerSupplier, final JoinWindows windows); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final RichValueJoiner<? super VString... stateStoreNames); <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector); <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super VOV, ? extends VR> joinerKR> selector, final JoinWindows windows, final Serialized<KR, V> serialized); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final Serde<K> keySerde, final Serde<V> thisValueSerdefinal RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner, final Serde<VO>JoinWindows otherValueSerdewindows); <VO, VR> KStream<K, VR> leftJoinjoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR> joinerVR, ? super final JoinWindows windows); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, K> joiner, final JoinWindows windows, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Joined<K, V, VO> joined); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final JoinWindows windows, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner, final Serde<K> keySerde, final JoinWindows windows); <VO, VR> KStream<K, VR> leftJoin(final Serde<V> thisValSerdeKStream<K, VO> otherStream, final Serde<VO> otherValueSerde); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner, final RichValueJoiner<? super V, ? super VO, ? extendsfinal VR>JoinWindows joinerwindows, final Joined<K, finalV, JoinWindowsVO> windowsjoined); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner, final JoinWindows windows); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? finalsuper Serde<K>K> keySerdejoiner, final Serde<V>JoinWindows thisValueSerdewindows, final Serde<VO> otherValueSerde Joined<K, V, VO> joined); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final RichValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Serde<K> keySerde K, ? super V, ? super VT, ? extends VR> joiner, final Serde<V> valSerde Joined<K, V, VT> joined); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final RichValueJoiner<? super VK, ? super VT, ? extends VR> joiner, final Serde<K> keySerdeV, ? super VT, ? extends VR> joiner, final Serde<V> valSerde Joined<K, V, VT> joined); <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable, final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final RichValueJoiner<? super K, ? super V, ? super GV, ? extends RV> joiner); <GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable, final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper, final RichValueJoiner<? super K, ? super V, ? super GV, ? extends RV> valueJoiner); |
KTable
...
Code Block | ||
---|---|---|
| ||
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate); KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName); KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier); final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate); KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier); KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final String queryableStoreName); <VR> KTable<K, VR> mapValues(final RichValueMapper<?Materialized<K, super V, ? extends VR> mapperKeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapperVR, final? Serde<VR>super valueSerde, final String queryableStoreNameK> mapper); <VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR> mapperVR, ? super final Serde<VR> valueSerdeK> mapper, final StateStoreSupplier<KeyValueStore> storeSupplier); void foreach(final RichForeachAction<? super K, ? super V> action final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, final Serde<KR> keySerdeSerialized<KR, final Serde<VR> valueSerdeVR> serialized); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? extendssuper VR>K> joiner); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner, final Serde<VR> joinSerde Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super finalK> String queryableStoreNamejoiner); <VO, VR> KTable<K, VR> joinleftJoin(final KTable<K, VO> other, final RichValueJoiner< ValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner, final StateStoreSupplier<KeyValueStore> storeSupplier);Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, VR> leftJoinouterJoin(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR>VR, ? super K> joiner); <VO, VR> KTable<K, VR> leftJoinouterJoin(final KTable<K, VO> other, final RichValueJoiner<? super V, ? super VO, ? extends VR>VR, ? super K> joiner, final Materialized<K, Serde<VR>VR, joinSerde, KeyValueStore<Bytes, byte[]>> materialized); |
KGroupedStream
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V, K> reducer); KTable<K, V> reduce(final RichReducer<V, K> reducer, final final String queryableStoreNameMaterialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR><VR> KTable<K, VR> leftJoinaggregate(final KTable<KRichInitializer<VR, VO>K> otherinitializer, final RichValueJoiner<RichAggregator<? super VK, ? super VOV, ? extends VR> joineraggregator, final Materialized<K, VR, final StateStoreSupplier<KeyValueStore> storeSupplierKeyValueStore<Bytes, byte[]>> materialized); <VO,<VR> VR> KTable<K, VR> outerJoinaggregate(final KTable<KRichInitializer<VR, VO>K> otherinitializer, final RichValueJoiner<RichAggregator<? super VK, ? super VO, ? extends V, VR> aggregator); |
SessionWindowedKStream
There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces.
Code Block | ||
---|---|---|
| ||
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T, Windowed<K>> initializer, joiner); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final RichValueJoiner<RichAggregator<? super VK, ? super VOV, ? extends VR> joinerT> aggregator, final Serde<VR> joinSerde, final RichMerger<? super K, T> sessionMerger); <VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, Windowed<K>> initializer, final String queryableStoreName); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final RichAggregator<? super K, ? super V, VR> aggregator, final RichValueJoiner<? super V, ? super VO, ? extends VR> joiner, final RichMerger<? super K, VR> sessionMerger, final StateStoreSupplier<KeyValueStore> storeSupplier); |
Limiting the ProcessorContext - RecordContext interface
We create a subset of features from ProcessorContext
and put into RecordContext
interface
Code Block | ||
---|---|---|
| ||
public interface RecordContext { String applicationId(); TaskId taskId(); StreamsMetrics metrics(); String topic(); int partition(); void commit(); long offset();final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized); KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer); KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer, long timestamp(); Map<String, Object> appConfigs(); Map<String, Object> appConfigsWithPrefix(String prefix); } public interface ProcessorContext extends RecordContext { // all methodsfinal butMaterialized<K, theV, ones in RecordContext } |
Once we need a conversion from ProcessorContext
and RecordContext, we just cast:
SessionStore<Bytes, byte[]>> materializedAs);
, |
TimeWindowedKStream
Code Block | ||
---|---|---|
| ||
private class KStreamMapProcessor extends AbstractProcessor<K, V> { <VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> initializer, @Override public void init(ProcessorContext processorContext) { super.init(processorContext); richMapper.init((RecordContext) processorContext); // HERE WE MAKE A CAST final RichAggregator<? super K, } ? super V, VR> @Overrideaggregator); <VR> KTable<Windowed<K>, public void processVR> aggregate(final K key, final V value) { RichInitializer<VR, K> initializer, V1 newValue = mapper.apply(key, value); context().forward(key, newValue); } @Override public void close() { super.close();final RichAggregator<? super K, ? super V, VR> aggregator, mapper.close(); } } |
Rich Interfaces
Code Block | ||
---|---|---|
| ||
public interface RichValueMapper<V, VR> { VR apply(final V value, final RecordContext recordContext); } public interface RichValueJoiner<V1, V2, VR> { VR apply(final V1 value1, final V2 value2, final RecordContext recordContextMaterialized<K, VR, WindowStore<Bytes, byte[]>> materialized); } publicKTable<Windowed<K>, interface RichKeyValueMapper<K, VV> reduce(final RichReducer<V, VR>K> {reducer); KTable<Windowed<K>, V> VR applyreduce(final K keyRichReducer<V, finalK> V valuereducer, final RecordContext recordContext); } public interface RichReducer<V> { V apply(final V value1, final V value2, final RecordContext recordContext); } public interface RichInitializer<VA> { VA apply(final RecordContext recordContext); } public interface Aggregator<K, V, VA> { VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext); } public interface RichForeachAction<K, V> { void apply(final K key, final V value, final RecordContext recordContext); } public interface RichPredicate<K, V> { boolean test(final K key, final V value, final RecordContext recordContext); } |
The same semantics apply to other interfaces as well.
So we don't need to add any overloaded methods for public APIs. Internally we perform 2 changes:
- Change the constructor type of all related Processors to accept rich interfaces
- Create converters from non-rich to rich interfaces
final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
|
KGroupedTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V, K> adder,
final RichReducer<V, K> subtractor,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> reduce(final RichReducer<V, K> adder,
final RichReducer<V, K> subtractor);
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
| ||
Code Block | ||
| ||
class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> { private final RichValueMapper<K, V, V1> mapper; public KStreamMapValues(RichValueMapper<K, V, V1> mapper) { this.mapper = mapper; } @Override public Processor<K, V> get() { return new KStreamMapProcessor(); } final RichAggregator<? privatesuper classK, KStreamMapProcessor? extendssuper AbstractProcessor<KV, V>VR> {subtractor, @Override public void init(ProcessorContext processorContext) { final Materialized<K, VR, KeyValueStore<Bytes, super.init(processorContextbyte[]>> materialized); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, mapper.init((RecordContext) processorContext); } @Override final RichAggregator<? super K, public? void process(final K key, final V value) { super V, VR> adder, V1 newValue = mapper.apply(key, value); final RichAggregator<? super K, ? super V, VR> subtractor); |
Proposed changes
Move
RecordContext
from.
processor.internals
to.processor
Make record context open to public
StreamTask.updateProcessorContext()
) :Code Block | ||
---|---|---|
| ||
// the below code snippet already exists, this is just for background. private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode().forward(key, newValue); } @Override public void close() { processorContext.setRecordContext(new super.close(ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic())); mapper.close(processorContext.setCurrentNode(currNode); } |
Sample processor should look like this:
Code Block | ||
---|---|---|
| ||
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> { ... } } } static <K, T1, T2, R> RichValueJoiner<K, T1, T2, R> convertToRichValueJoiner(final ValueJoinerWithKey<K, T1, T2, R> valueJoinerWithKey) { Objects.requireNonNull(valueJoinerWithKey, "valueJoiner can't be null"); ifprivate (valueJoinerWithKeyRecordContext instanceofrecordContext; RichValueJoiner) { return (RichValueJoiner<K, T1, T2, R>) valueJoinerWithKey; } else { // this line is added in this KIP ... @Override public void returnprocess(final newK1 RichValueJoiner<Kkey, T1,final T2,V1 R>(value) { recordContext = new RecordContext() { @Override // publicrecordContext voidinitialization init() {} is added in this KIP @Override public long offset() { public void close() { return context().recordContext().offset(); } @Override public long timestamp() @Override{ return context().recordContext().timestamp(); public R apply(K key, T1 value1, T2 value2) { } @Override public String topic() { return valueJoinerWithKey.apply(key, value1, value2context().recordContext().topic(); } }; @Override } } static <K, T1, T2, R> ValueJoinerWithKey<K, T1, T2, R> convertToValueJoinerWithKey(final ValueJoiner<T1, T2, R> valueJoiner) { Objects.requireNonNull(valueJoiner, "valueJoiner can't be null"); return new ValueJoinerWithKey<K, T1, T2, R>() { public int partition() { return context().recordContext().partition(); } }; if (key != null && value != null) { final @Override V2 value2 = valueGetter.get(keyMapper.apply(key, value)); public R apply(K key, T1if value1, T2(leftJoin || value2 != null) { return valueJoinercontext().forward(key, joiner.apply(value1value, value2, recordContext)); } }; } } |
Rejected Alternatives
Not yet.