...
Current state: "Under Discussion"
Discussion thread: TBD here
JIRA: TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params)
, close()
methods are needed.
Public Interfaces
KStream.java
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP combines KIP-149 and provides a hybrid solution to rich functions in Streams and accessing read-only keys within ValueJoiner, ValueTransformer, ValueMapper interfaces.
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods. For example:
- having access to RecordContext within an operator
- having access to a read-only key for ValueJoiner, ValueTransformer, ValueMapper interfaces
Rich Interfaces
Code Block | ||
---|---|---|
| ||
public interface RichInitializer<V, K> {
V apply(K key);
}
public interface RichValueMapper<V, VR, K> {
VR apply(final V value, final K key, final RecordContext recordContext);
}
public interface RichValueJoiner<V1, V2, VR, K> {
VR apply(final V1 value1, final V2 value2, final K key, final RecordContext recordContext);
}
public interface RichKeyValueMapper<K, V, VR> {
VR apply(final K key, final V value, final RecordContext recordContext);
}
public interface RichReducer<V, K> {
V apply(final V value1, final V value2, final K key, final RecordContext recordContext);
}
public interface RichAggregator<K, V, VA> {
VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext);
}
public interface RichForeachAction<K, V> {
void apply(final K key, final V value, final RecordContext recordContext);
}
public interface RichPredicate<K, V> {
boolean test(final K key, final V value, final RecordContext recordContext);
}
public interface RichMerger<K, V> {
V apply(final K aggKey, final V aggOne, final V aggTwo, final RecordContext recordContext);
}
public interface RichValueTransformer<V, VR, K> {
void init(final ProcessorContext context);
VR transform(final V value, final K key);
void close();
}
public interface RichValueTransformerSupplier<V, VR, K> {
RichValueTransformer<V, VR, K> get();
}
|
Public Interfaces
KStream
Code Block | ||
---|---|---|
| ||
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);
KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);
<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR, ? super K> mapper);
<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>, ? super K> mapper);
void foreach(final RichForeachAction< | ||
Code Block | ||
| ||
KStream<K, V> filter(Predicate<? super K, ? super V> predicate, final RecordContext recordContext); KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate, final RecordContext recordContext); <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper, final RecordContext recordContext); <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, final RecordContext recordContext); <KR, VR> KStream<KR, VR> flatMap(final KeyValueMapper<? super K, ? super VV> action); KStream<K, ?V> extendspeek(final Iterable<RichForeachAction<? extendssuper K, KeyValue<? extends KR, ? extends VR>>> mapper, final RecordContext recordContext); super V> action); KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates); <VR> KStream<K, VR> flatMapValuestransformValues(final ValueMapper<RichValueTransformerSupplier<? super V, ? extends VR, Iterable<? extendssuper VR>> processor);<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, K> valueTransformerSupplier, final String... stateStoreNames); <KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector); <KR> KGroupedStream<KR, V> groupBy(final ValueJoiner<RichKeyValueMapper<? super VK, ? super VOV, KR> ?selector, extends VR> joiner, final JoinWindows windows, final RecordContext recordContextSerialized<KR, V> serialized); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final ValueJoiner<RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner, final JoinWindows windows); <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final Serde<K> keySerde, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? finalsuper Serde<V>K> thisValueSerdejoiner, final Serde<VO>JoinWindows otherValueSerdewindows, final RecordContext recordContext) Joined<K, V, VO> joined); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final ValueJoiner<RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner, final JoinWindows windows, final RecordContext recordContext windows); <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final ValueJoiner<RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner, final JoinWindows windows, final Serde<K> keySerde Joined<K, V, VO> joined); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final Serde<V> thisValSerde, RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner, final Serde<VO> otherValueSerde, final RecordContextJoinWindows recordContextwindows); <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final ValueJoiner<RichValueJoiner<? super V, ? super VO, ? extends VR>VR, ? super K> joiner, final JoinWindows windows, final RecordContext recordContext); <VO final Joined<K, V, VO> joined); <VT, VR> KStream<K, VR> outerJoinjoin(final KStream<KKTable<K, VO>VT> otherStreamtable, final RichValueJoiner<? super K, final ValueJoiner<? super V, ? super VOVT, ? extends VR> joiner); <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final JoinWindows windows, final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner, final Serde<K> keySerde, final Joined<K, V, VT> joined); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, final Serde<V> thisValueSerde, final RichValueJoiner<? super K, ? super V, final? Serde<VO> otherValueSerde, final RecordContext recordContext); super VT, ? extends VR> joiner); <VT, VR> KStream<K, VR> joinleftJoin(final KTable<K, VT> table, final ValueJoiner<RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner, final RecordContext recordContext Joined<K, V, VT> joined); <VT<GK, GV, VR>RV> KStream<K, VR>RV> join(final KTable<KGlobalKTable<GK, VT>GV> tableglobalKTable, final ValueJoiner<RichKeyValueMapper<? super VK, ? super VTV, ? extends VR>GK> joinerkeyValueMapper, final Serde<K> keySerde, final RichValueJoiner<? super K, ? super V, ? super GV, ? final Serde<V> valSerde, final RecordContext recordContextextends RV> joiner); <VT<GK, GV, VR>RV> KStream<K, VR>RV> leftJoin(final KTable<KGlobalKTable<GK, VT>GV> tableglobalKTable, final ValueJoiner<RichKeyValueMapper<? super VK, ? super VTV, ? extends VR>GK> joiner, final RecordContext recordContext); <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, keyValueMapper, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final RichValueJoiner<? super K, ? super V, ? super GV, ? extends RV> valueJoiner); |
KTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate); KTable<K, V> filter(final RichPredicate<? Serde<K>super keySerdeK, ? super V> predicate, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> filterNot(final RichPredicate<? finalsuper Serde<V> valSerde, final RecordContext recordContext); <GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable, K, ? super V> predicate); KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> mapValues(final RichValueMapper<? super final KeyValueMapper<V, ? extends VR, ? super K, K> mapper); <VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends GK> keyValueMapperVR, ? super K> mapper, final ValueJoiner<? super VMaterialized<K, ? super GVVR, ? extends RV> joiner, final RecordContext recordContextKeyValueStore<Bytes, byte[]>> materialized); <GK,<KR> GVKStream<KR, RV> KStream<K, RV> leftJoinV> toStream(final GlobalKTable<GK,RichKeyValueMapper<? GV>super globalKTableK, final KeyValueMapper<? super V, ? extends KR> mapper); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, ?KeyValue<KR, extendsVR>> GK> keyValueMapperselector, final ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner, final Serialized<KR, RecordContextVR> recordContextserialized); |
KTable.java
Code Block | ||
---|---|---|
| ||
<VO, VR> KTable<K, V>VR> filterjoin(final Predicate<?KTable<K, superVO> K, ? super V> predicate); KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final String queryableStoreName); KTable<K, V> filter(final Predicate<? super Kother, final RichValueJoiner<? super V, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier); KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate); KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final StateStoreSupplier<KeyValueStore> storeSupplier); KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final String queryableStoreName); <VR> KTable<K, VR> mapValues(final ValueMapper<VO, ? extends VR, ? super K> joiner); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<? super V, ? extendssuper VR> mapper); <VR> KTable<K, VR> mapValues(final ValueMapper<VO, ? extends VR, ? super V, ? extends VR> mapper, final Serde<VR> valueSerde, final String queryableStoreName); <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, K> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final Serde<VR> valueSerde, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? finalsuper StateStoreSupplier<KeyValueStore>K> storeSupplierjoiner); <VO, VR> KTable<K, VR> joinleftJoin(final KTable<K, VO> other, final ValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR>final joinerMaterialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final Serde<VR> joinSerde, final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super final String queryableStoreNameK> joiner); <VO, VR> KTable<K, VR> joinouterJoin(final KTable<K, VO> other, final ValueJoiner<RichValueJoiner<? super V, ? super VO, ? extends VR> VR, ? super K> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); |
KGroupedStream
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V, final StateStoreSupplier<KeyValueStore> storeSupplierK> reducer); <VO, VR> KTable<K, VR>V> leftJoinreduce(final KTable<KRichReducer<V, VO>K> otherreducer, final ValueJoiner<? superMaterialized<K, V, ? super VOKeyValueStore<Bytes, ? extends VR> joinerbyte[]>> materialized); <VO, VR><VR> KTable<K, VR> leftJoinaggregate(final KTable<KRichInitializer<VR, VO>K> otherinitializer, final ValueJoiner<RichAggregator<? super VK, ? super VOV, ? extends VR> joineraggregator, final Materialized<K, final Serde<VR> joinSerde, VR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final RichInitializer<VR, K> initializer, final String queryableStoreName); <VO, VR> KTable<KRichAggregator<? super K, ? super V, VR> aggregator); |
SessionWindowedKStream
There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces.
Code Block | ||
---|---|---|
| ||
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T, Windowed<K>> initializer, leftJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final StateStoreSupplier<KeyValueStore> storeSupplier); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, RichAggregator<? super K, ? super V, T> aggregator, final ValueJoiner<? super V,final RichMerger<? super VOK, ? extends VR> joinerT> sessionMerger); <VO, VR> KTable<K<VR> KTable<Windowed<K>, VR> outerJoinaggregate(final KTable<KRichInitializer<VR, VO>Windowed<K>> otherinitializer, final ValueJoiner<? super V, ? super VO,final RichAggregator<? extendssuper VR> joiner, K, ? super V, VR> aggregator, final Serde<VR> joinSerde, final RichMerger<? super K, VR> sessionMerger, final String queryableStoreName); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized); KTable<Windowed<K>, V> reduce(final ValueJoiner<?RichReducer<V, super VK> reducer); KTable<Windowed<K>, ?V> superreduce(final VORichReducer<V, ? extends VR> joinerK> reducer, final Materialized<K, V, final StateStoreSupplier<KeyValueStore> storeSupplier); |
Limiting the ProcessorContext - RecordContext interface
We create a subset of features from ProcessorContext
and put into RecordContext
interface
SessionStore<Bytes, byte[]>> materializedAs);
, |
TimeWindowedKStream
Code Block | ||
---|---|---|
| ||
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> initializer, | ||
Code Block | ||
| ||
public interface RecordContext { String applicationId(); TaskId taskId(); StreamsMetrics metrics(); String topic(); int partition(); long offset(); final long timestamp(); Map<String, Object> appConfigs(RichAggregator<? super K, ? super V, VR> aggregator); <VR> KTable<Windowed<K>, VR> aggregate(final Map<StringRichInitializer<VR, K> Object> appConfigsWithPrefix(String prefix); } public interface ProcessorContext extends RecordContext { // all methods but the ones in RecordContext } |
Once we need a conversion from ProcessorContext
and RecordContext, we just cast:
Code Block | ||
---|---|---|
| ||
private class KStreamMapProcessor extends AbstractProcessor<K, V> { @Override public void init(ProcessorContext processorContext) { super.init(processorContext);initializer, final RichAggregator<? super K, ? super V, VR> aggregator, richMapper.init((RecordContext) processorContext); // here make a cast } @Override public void process(final K key, final V value) { final Materialized<K, VR, V1 newValue = mapper.apply(key, value); context().forward(key, newValue); } @Override public void close() { WindowStore<Bytes, byte[]>> materialized); KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer); KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer, super.close(); final Materialized<K, V, WindowStore<Bytes, mapper.close(byte[]>> materialized); } } |
Rich Interfaces
If the interface is value-only (like ValueJoiner, ValueMapper
) we extend its rich interface from its withKey'ed counterpart.
If the interface is key-value (KeyValueMapper) we extend its rich interface from itself.
KGroupedTable
Code Block | ||
---|---|---|
| ||
public interface RichFunction { KTable<K, V> reduce(final RichReducer<V, K> adder, void init(RecordContext recordContext); void close(); } public interface ValueMapperWithKey<K, V, VR> { final RichReducer<V, K> subtractor, VR apply(final KMaterialized<K, keyV, final V valueKeyValueStore<Bytes, byte[]>> materialized); } public interface RichValueMapper<K, V, VR> extends ValueMapperWithKey<K, V, VR>, RichFunction { } public interface RichKeyValueMapper<K, V, VR> extends KeyValueMapper<K, V, VR>, RichFunction { } |
The same semantics apply to other interfaces as well.
So we don't need to add any overloaded methods for public APIs. Internally we perform 2 changes:
- Change the constructor type of all related Processors to accept rich interfaces
- Create converters from non-rich to rich interfaces
Code Block | ||
---|---|---|
| ||
class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> { private final RichValueMapper<K, V, V1> mapper; public KStreamMapValues(RichValueMapper<K, V, V1> mapper) { this.mapper = mapper; } @Override public Processor<K, V> get() { KTable<K, V> reduce(final RichReducer<V, K> adder, final RichReducer<V, K> subtractor); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, returnfinal new KStreamMapProcessor(); } private class KStreamMapProcessor extends AbstractProcessor<K, V> { RichAggregator<? super K, ? super V, VR> subtractor, @Override public void init(ProcessorContext processorContext) { final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final super.init(processorContext);RichInitializer<VR> initializer, mapper.init((RecordContext) processorContext); } @Override final RichAggregator<? super K, ? super V, VR> publicadder, void process(final K key, final V value) { V1 newValue = mapper.apply(key, value); final RichAggregator<? super K, ? super V, VR> subtractor); |
Proposed changes
Move
RecordContext
from.
processor.internals
to.processor
Make record context open to public
StreamTask.updateProcessorContext()
) :Code Block | ||
---|---|---|
| ||
// the below code snippet already exists, this is just for background. private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode().forward(key, newValue); } @Override public void close() { processorContext.setRecordContext(new super.close(); mapper.close(ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic())); processorContext.setCurrentNode(currNode); } |
Sample processor should look like this:
Code Block | ||
---|---|---|
| ||
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> { ... private RecordContext recordContext; // this line is added in this KIP ... @Override public void process(final K1 key, final V1 value) { recordContext = new RecordContext() { } } } static <K, T1, T2, R> RichValueJoiner<K, T1, T2, R> convertToRichValueJoiner(final ValueJoinerWithKey<K, T1, T2, R> valueJoinerWithKey) { Objects.requireNonNull(valueJoinerWithKey, "valueJoiner can't be null"); if (valueJoinerWithKey instanceof RichValueJoiner) { return (RichValueJoiner<K, T1, T2, R>) valueJoinerWithKey; } else { return new RichValueJoiner<K, T1, T2, R>() { @Override public void init() {}// recordContext initialization is added in this KIP @Override public long offset() @Override{ return context().recordContext().offset(); public void close() { } @Override public long timestamp() @Override{ return context().recordContext().timestamp(); public R apply(K key, T1 value1, T2 value2) { } @Override public String topic() { return valueJoinerWithKey.apply(key, value1, value2context().recordContext().topic(); } @Override }; } } static <K, T1, T2, R> ValueJoinerWithKey<K, T1, T2, R> convertToValueJoinerWithKey(final ValueJoiner<T1, T2, R> valueJoiner) { Objects.requireNonNull(valueJoiner, "valueJoiner can't be null"); return new ValueJoinerWithKey<K, T1, T2, R>( public int partition() { return context().recordContext().partition(); } }; if (key != null && value != null) { @Override final V2 value2 = valueGetter.get(keyMapper.apply(key, value)); public R apply(K key, T1if value1, T2(leftJoin || value2 != null) { return valueJoinercontext().forward(key, joiner.apply(value1value, value2, recordContext)); } }; } } |
Rejected Alternatives
Not yet.