Code Block | ||
| ||
KTable<K, V> reduce(final RichReducer<V> reducer); KTable<K, V> reduce(final RichReducer<V> reducer, final String queryableStoreName); KTable<K, V> reduce(final RichReducer<V> reducer, final StateStoreSupplier<KeyValueStore> storeSupplier); <W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final Windows<W> windows, final String queryableStoreName); <W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final Windows<W> windows); <W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows, final String queryableStoreName); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final Serde<VR> aggValueSerde, final String queryableStoreName); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final Serde<VR> aggValueSerde, final String queryableStoreName); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final Serde<VR> aggValueSerde, final String queryableStoreName); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final String queryableStoreName); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final String queryableStoreName); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final String queryableStoreName); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final StateStoreSupplier<SessionStore> storeSupplier); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final StateStoreSupplier<SessionStore> storeSupplier); <T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final StateStoreSupplier<SessionStore> storeSupplier); |
We create a subset of features from ProcessorContext
and put into RecordContext
Code Block | ||
| ||
public interface RecordContext {KTable<K, V> reduce(final RichReducer<V> adder, String applicationId(); TaskId taskId(); StreamsMetrics metrics(); final String topic();Reducer<V> subtractor, int partition(); void commit(); long offset(); long timestamp(); Map<String,final Object>String appConfigs(queryableStoreName); KTable<K, V> reduce(final Reducer<V> Map<Stringadder, Object> appConfigsWithPrefix(String prefix); } public interface ProcessorContext extends RecordContext { // all methods but the ones in RecordContext } |
Once we need a conversion from ProcessorContext
and RecordContext, we just cast:
Code Block | ||
| ||
private class KStreamMapProcessorfinal extendsRichReducer<V> AbstractProcessor<Ksubtractor, V> { @Override public void init(ProcessorContext processorContext) { final String super.init(processorContextqueryableStoreName); KTable<K, V> reduce(final RichReducer<V> adder, richMapper.init((RecordContext) processorContext); // HERE WE MAKE A CAST } final RichReducer<V> @Overridesubtractor, public void process(final K key, final V value) { V1final newValue = mapper.apply(key, value); context().forward(key, newValue);String queryableStoreName); KTable<K, V> reduce(final RichReducer<V> adder, } @Override public void close() { final Reducer<V> super.close(subtractor); KTable<K, V> reduce(final Reducer<V> adder, mapper.close(); } } |
Rich Interfaces
Code Block | ||
| ||
public interface RichValueMapper<V, VR> { VR apply(final V value, final RecordContextRichReducer<V> recordContextsubtractor); } public interface RichValueJoiner<V1, V2, VR> { KTable<K, V> reduce(final RichReducer<V> adder, VR apply(final V1 value1, final V2 value2, final RecordContextRichReducer<V> recordContextsubtractor); } public interface RichKeyValueMapper<KKTable<K, V, VR> {V> reduce(final RichReducer<V> adder, VR apply(final K key, final V value, final RecordContext recordContext); } public interface RichReducer<V> { V apply(final VReducer<V> value1subtractor, final V value2, final RecordContext recordContext); } public interface RichInitializer<VA> { VA apply(final RecordContextStateStoreSupplier<KeyValueStore> recordContextstoreSupplier); } public interface Aggregator<K, V, VA> { KTable<K, V> reduce(final Reducer<V> adder, VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext); } public interface RichForeachAction<K, V> { RichReducer<V> subtractor, void apply(final K key, final V value, final RecordContextStateStoreSupplier<KeyValueStore> recordContextstoreSupplier); } public interface RichPredicate<K, V> { KTable<K, V> reduce(final RichReducer<V> adder, boolean test(final KRichReducer<V> key, final V value,subtractor, final RecordContextStateStoreSupplier<KeyValueStore> recordContextstoreSupplier); } |
The same semantics apply to other interfaces as well.
So we don't need to add any overloaded methods for public APIs. Internally we perform 2 changes:
- Change the constructor type of all related Processors to accept rich interfaces
- Create converters from non-rich to rich interfaces
Code Block | ||
| ||
class KStreamMapValues<K, V, V1> implements ProcessorSupplier<K, V> { private final RichValueMapper<K, V, V1> mapper; public KStreamMapValues(RichValueMapper<K, V, V1> mapper) { <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, this.mapper = mapper; } final RichAggregator<? super @Override K, ? super V, publicVR> Processor<Ksubtractor, V> get() { return new KStreamMapProcessor(); } private class KStreamMapProcessor extends AbstractProcessor<K, V> { final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final RichInitializer<VR> @Overrideinitializer, public void init(ProcessorContext processorContext) { super.init(processorContext); final RichAggregator<? super K, ? super mapper.init((RecordContext) processorContext); V, VR> adder, } @Override public void process(final RichAggregator<? super K, key? super V, finalVR> Vsubtractor, value) { V1 newValue = mapper.apply(key, value); context().forward(key, newValue); final StateStoreSupplier<KeyValueStore> storeSupplier); |
Proposed changes
Limiting the ProcessorContext - RecordContext interface
We create a subset of features from ProcessorContext
and put into RecordContext
Code Block | ||
| ||
public interface RecordContext { String } applicationId(); TaskId taskId(); @Override StreamsMetrics metrics(); String topic(); public int partition(); void closecommit() {; long offset(); long super.closetimestamp(); Map<String, Object> appConfigs(); Map<String, Object> mapper.close(appConfigsWithPrefix(String prefix); } public interface ProcessorContext extends RecordContext { // all methods but the ones in RecordContext } |
Once we need a conversion from ProcessorContext
and RecordContext, we just cast:
Code Block | ||
| ||
private class KStreamMapProcessor extends AbstractProcessor<K, V> { @Override public void init(ProcessorContext processorContext} } } static <K, T1, T2, R> RichValueJoiner<K, T1, T2, R> convertToRichValueJoiner(final ValueJoinerWithKey<K, T1, T2, R> valueJoinerWithKey) { Objects.requireNonNull(valueJoinerWithKey, "valueJoiner can't be null" super.init(processorContext); if (valueJoinerWithKey instanceof RichValueJoiner) { return (RichValueJoiner<K, T1, T2, R>) valueJoinerWithKey; richMapper.init((RecordContext) processorContext); // HERE WE MAKE A CAST } else { @Override public void returnprocess(final newK RichValueJoiner<Kkey, T1,final T2,V R>(value) { V1 newValue = @Overridemapper.apply(key, value); context().forward(key, newValue); public void init() {} @Override public void close() { @Override super.close(); public void mapper.close() {; } } @Override |
Rich Interfaces
Code Block | ||
| ||
public interface RichValueMapper<V, VR> { VR apply(final V value, final RecordContext recordContext); } public R apply(K keyinterface RichValueJoiner<V1, T1 value1V2, T2 value2)VR> { VR apply(final V1 value1, final V2 value2, final return valueJoinerWithKey.apply(key, value1, value2); RecordContext recordContext); } public interface RichKeyValueMapper<K, V, VR> { VR apply(final } }; }K key, final V value, final RecordContext recordContext); } staticpublic <K,interface T1, T2, R> ValueJoinerWithKey<K, T1, T2, R> convertToValueJoinerWithKey(final ValueJoiner<T1, T2, R> valueJoiner)RichReducer<V> { V apply(final V value1, final V value2, final RecordContext recordContext); } public interface RichInitializer<VA> { Objects.requireNonNull(valueJoiner, "valueJoiner can't be null"); return new ValueJoinerWithKey<K, T1, T2, R>()VA apply(final RecordContext recordContext); } public interface RichAggregator<K, V, VA> { VA apply(final K key, @Override final V value, final VA aggregate, final RecordContext recordContext); } public R apply(K keyinterface RichForeachAction<K, T1V> value1,{ T2 value2) { void apply(final K key, final V value, final RecordContext recordContext); } public interface return valueJoiner.apply(value1, value2);RichPredicate<K, V> { boolean test(final K key, final } V value, final RecordContext }recordContext); } |
Rejected Alternatives
Not yet.