...
Current state: "Under Discussion"
Discussion thread: TBD here
JIRA: TBD
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods especially when init(Some params)
, close()
methods are needed.
Public Interfaces
We assume to build this KIP on top of KIP-149, meaning, we build rich functions on top of "withKey" interfaces. So, there is no public interface change apart from the ones shown in KIP-149.
The reason is that, once withKey
interfaces are part of public API, we will not need any overloaded methods for rich functions.
Moreover, we separated onlyValue (ValueMapper)
and withKey (ValueMapperWithKey)
interfaces (no inheritance relation) to enable lambdas. Therefore, while introducing the rich functios, we have to chose one of them to extend.
It is better to select the more general interface (withKey
in this case) for rich functions.
Proposed Changes
There 2 main issues to consider while introducing rich functions: 1. Limiting the ProcessorContext for init(Some param) method and introducing rich functions in a backwards compatible way.
Limiting the ProcessorContext - RecordContext interface
We create a subset of features from ProcessorContext
and put into RecordContext
interface
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP combines KIP-149 and provides a hybrid solution to rich functions in Streams and accessing read-only keys within ValueJoiner, ValueTransformer, ValueMapper interfaces.
Rich functions are one of the essential parts of stream processing. There are several use-cases where users cannot express their business logic with current un-rich methods. For example:
- having access to RecordContext within an operator
- having access to a read-only key for ValueJoiner, ValueTransformer, ValueMapper interfaces
Rich Interfaces
Code Block | ||
---|---|---|
| ||
public interface RichInitializer<V, K> | ||
Code Block | ||
| ||
public interface RecordContext { StringV applicationIdapply(K key); } public interface RichValueMapper<V, VR, TaskId taskId();K> { StreamsMetricsVR metricsapply(); final V value, final String topic(); int partition(K key, final RecordContext recordContext); } public interface RichValueJoiner<V1, long offset();V2, VR, K> { longVR timestampapply(); final V1 Map<Stringvalue1, Object> appConfigs(); Map<String, Object> appConfigsWithPrefix(String prefixfinal V2 value2, final K key, final RecordContext recordContext); } public interface ProcessorContextRichKeyValueMapper<K, extendsV, RecordContextVR> { // all methods but the ones in RecordContext } |
Once we need a conversion from ProcessorContext
and RecordContext, we just cast:
Code Block | ||
---|---|---|
| ||
private class KStreamMapProcessor extends AbstractProcessor<K, V> VR apply(final K key, final V value, final RecordContext recordContext); } public interface RichReducer<V, K> { @Override V apply(final V value1, publicfinal void init(ProcessorContext processorContext) { super.init(processorContext); V value2, final K key, final RecordContext recordContext); } public interface RichAggregator<K, V, VA> { VA richMapper.init((RecordContext) processorContext); // here } @Overrideapply(final K key, final V value, final VA aggregate, final RecordContext recordContext); } public interface RichForeachAction<K, V> { public void processapply(final K key, final V value) { , final RecordContext recordContext); } public interface RichPredicate<K, V> { V1boolean newValuetest(final =K mapper.apply(key, value); final V value, final context().forward(key, newValueRecordContext recordContext); } public interface RichMerger<K, V> @Override{ V apply(final K aggKey, publicfinal void close() { super.close(); mapper.V aggOne, final V aggTwo, final RecordContext recordContext); } public interface RichValueTransformer<V, VR, K> { void init(final ProcessorContext context); VR transform(final V value, final K key); void close(); } } |
Rich Interfaces
If the interface is value-only (like ValueJoiner, ValueMapper
) we extend its rich interface from its withKey'ed counterpart.
If the interface is key-value (KeyValueMapper) we extend its rich interface from itself.
Code Block | ||
---|---|---|
| ||
public interface RichFunction { void init(RecordContext recordContext); void close public interface RichValueTransformerSupplier<V, VR, K> { RichValueTransformer<V, VR, K> get(); } public interface ValueMapperWithKey<K, V, VR> { /** * Map the given value to a new value. * * @param value the value to be mapped * @return the new value */ VR apply(final K key, final V value); } public interface RichValueMapper<K, V, VR> extends ValueMapperWithKey<K, V, VR>, RichFunction { } public interface RichKeyValueMapper<K, V, VR> extends KeyValueMapper<K, V, VR>, RichFunction { } |
The same semantics apply to other interfaces as well.
So we don't need to add any overloaded methods for public APIs. Internally we perform 2 changes:
- Change the constructor type of all related Processors to accept rich interfaces
- Create converters from non-rich to rich interfaces
|
Public Interfaces
KStream
Code Block | ||
---|---|---|
| ||
KStream<K, V> filter(RichPredicate<? super K, ? super V> predicate);
KStream<K, V> filterNot(RichPredicate<? super K, ? super V> predicate);
<KR> KStream<KR, V> selectKey(RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
<KR, VR> KStream<KR, VR> map(RichKeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper);
<VR> KStream<K, VR> mapValues(RichValueMapper<? super V, ? extends VR, ? super K> mapper);
<KR, VR> KStream<KR, VR> flatMap(final RichKeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper);
<VR> KStream<K, VR> flatMapValues(final RichValueMapper<? super V, ? extends Iterable<? extends VR>, ? super K> mapper);
void foreach(final RichForeachAction<? super K, ? super V> action);
KStream<K, V> peek(final RichForeachAction<? super K, ? super V> action);
KStream<K, V>[] branch(final RichPredicate<? super K, ? super V>... predicates);
<VR> KStream<K, VR> transformValues(final RichValueTransformerSupplier<? super V, ? extends VR, ? super K> valueTransformerSupplier,
final String... stateStoreNames);
<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector);
<KR> KGroupedStream<KR, V> groupBy(final RichKeyValueMapper<? super K, ? super V, KR> selector,
final Serialized<KR, V> serialized);
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
final JoinWindows windows);
<VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
final JoinWindows windows,
final Joined<K, V, VO> joined);
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
final JoinWindows windows);
<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
final JoinWindows windows,
final Joined<K, V, VO> joined);
<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
final JoinWindows windows);
<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
final JoinWindows windows,
final Joined<K, V, VO> joined);
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner);
<VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner,
final Joined<K, V, VT> joined);
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner);
<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
final RichValueJoiner<? super K, ? super V, ? super VT, ? extends VR> joiner,
final Joined<K, V, VT> joined);
<GK, GV, RV> KStream<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
final RichValueJoiner<? super K, ? super V, ? super GV, ? extends RV> joiner);
<GK, GV, RV> KStream<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
final RichKeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
final RichValueJoiner<? super K, ? super V, ? super GV, ? extends RV> valueJoiner);
|
KTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate);
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate);
KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR, ? super K> mapper);
<VR> KTable<K, VR> mapValues(final RichValueMapper<? super V, ? extends VR, ? super K> mapper,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper);
<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);
<KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
final Serialized<KR, VR> serialized);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner);
<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner);
<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
final ValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner);
<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
final RichValueJoiner<? super V, ? super VO, ? extends VR, ? super K> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
|
KGroupedStream
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V, K> reducer);
KTable<K, V> reduce(final RichReducer<V, K> reducer,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final RichInitializer<VR, K> initializer,
final RichAggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final RichInitializer<VR, K> initializer,
final RichAggregator<? super K, ? super V, VR> aggregator); |
SessionWindowedKStream
There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces.
Code Block | ||
---|---|---|
| ||
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T, Windowed<K>> initializer,
final RichAggregator<? super K, ? super V, T> aggregator,
final RichMerger<? super K, T> sessionMerger);
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, Windowed<K>> initializer,
final RichAggregator<? super K, ? super V, VR> aggregator,
final RichMerger<? super K, VR> sessionMerger,
final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized);
KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer);
KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer,
final Materialized<K, V, SessionStore<Bytes, byte[]>> materializedAs);
, |
TimeWindowedKStream
Code Block | ||
---|---|---|
| ||
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> initializer,
final RichAggregator<? super K, ? super V, VR> aggregator);
<VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR, K> initializer,
final RichAggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized);
KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer);
KTable<Windowed<K>, V> reduce(final RichReducer<V, K> reducer,
final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
|
KGroupedTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V, K> adder,
final RichReducer<V, K> subtractor,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
KTable<K, V> reduce(final RichReducer<V, K> adder,
final RichReducer<V, K> subtractor);
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer,
final RichAggregator<? super K, ? super V, VR> adder,
final RichAggregator<? super K, ? super V, VR> subtractor,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, | ||
Code Block | ||
| ||
class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> { private final RichValueMapper<K, V, V1> mapper; public KStreamMapValues(RichValueMapper<K, V, V1> mapper) { this.mapper = mapper; } @Override public Processor<K, V> get() { return new KStreamMapProcessor(); } private class KStreamMapProcessor extends AbstractProcessor<K, V> { @Override public void init(ProcessorContext processorContext) { super.init(processorContext); mapper.init((RecordContext) processorContext); } @Override final RichAggregator<? super K, ? super V, VR> adder, public void process(final K key, final V value) { V1 newValue = mapper.apply(key, value); final RichAggregator<? super K, ? super V, context().forward(key, newValue); } @Override public void close() { super.close(); mapper.close(); } } } static <K, T1, T2, R> RichValueJoiner<K, T1, T2, R> convertToRichValueJoiner(final ValueJoinerWithKey<K, T1, T2, R> valueJoinerWithKey) { Objects.requireNonNull(valueJoinerWithKey, "valueJoiner can't be null"); if (valueJoinerWithKey instanceof RichValueJoiner) { return (RichValueJoiner<K, T1, T2, R>) valueJoinerWithKey; } else { return new RichValueJoiner<K, T1, T2, R>() { @Override VR> subtractor); |
Proposed changes
Move
RecordContext
from.
processor.internals
to.processor
Make record context open to public
StreamTask.updateProcessorContext()
) :Code Block | ||
---|---|---|
| ||
// the below code snippet already exists, this is just for background.
private void updateProcessorContext(final StampedRecord record, final ProcessorNode currNode) {
processorContext.setRecordContext(new ProcessorRecordContext(record.timestamp, record.offset(), record.partition(), record.topic()));
processorContext.setCurrentNode(currNode);
} |
Sample processor should look like this:
Code Block | ||
---|---|---|
| ||
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> { ... private RecordContext recordContext; // this line is added in this KIP ... @Override public void process(final K1 key, final V1 value) { recordContext = new RecordContext() { public void init() {} // recordContext initialization is added in this KIP @Override public long offset() { public void close() { return context().recordContext().offset(); } @Override public long timestamp() @Override{ return public R apply(K key, T1 value1, T2 value2) {context().recordContext().timestamp(); } @Override public String topic() { return valueJoinerWithKeycontext().apply(key, value1, value2recordContext().topic(); } @Override } public int partition() { }; } } static <K, T1, T2, R> ValueJoinerWithKey<K, T1, T2, R> convertToValueJoinerWithKey(final ValueJoiner<T1, T2, R> valueJoiner) { Objects.requireNonNull(valueJoiner, "valueJoiner can't be null"); return new ValueJoinerWithKey<K, T1, T2, R>() { @Override return context().recordContext().partition(); } }; if (key != null && value != null) { final V2 value2 = valueGetter.get(keyMapper.apply(key, value)); public Rif apply(KleftJoin key,|| T1value2 value1, T2 value2!= null) { return valueJoinercontext().forward(key, joiner.apply(value1value, value2, recordContext)); } }; } } |
Rejected Alternatives
Not yet.