THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
public interface RichValueMapper<K, V, VR> { VR apply(final K key, final V value, final RecordContext recordContext); } public interface RichValueJoiner<K, V1, V2, VR> { VR apply(final K key, final V1 value1, final V2 value2, final RecordContext recordContext); } public interface RichKeyValueMapper<K, V, VR> { VR apply(final K key, final V value, final RecordContext recordContext); } public interface RichReducer<K, V> { V apply(final K key, final V value1, final V value2, final RecordContext recordContext); } public interface RichAggregator<K, V, VA> { VA apply(final K key, final V value, final VA aggregate, final RecordContext recordContext); } public interface RichForeachAction<K, V> { void apply(final K key, final V value, final RecordContext recordContext); } public interface RichPredicate<K, V> { boolean test(final K key, final V value, final RecordContext recordContext); } public interface RichMerger<K, V> { V apply(final K aggKey, final V aggOne, final V aggTwo, final RecordContext recordContext); } public interface RichValueTransformer<K, V, VR> { void init(final ProcessorContext context); VR transform(final V value); @Deprecated VR punctuate(final K key, final long timestamp); void close(); } public interface RichValueTransformerSupplier<K, V, VR> { RichValueTransformer<K, V, VR> get(); } |
...
Code Block | ||
---|---|---|
| ||
KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate); KTable<K, V> filter(final RichPredicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate); KTable<K, V> filterNot(final RichPredicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> mapValues(final RichValueMapper<? super K, ? super V, ? extends VR> mapper); <VR> KTable<K, VR> mapValues(final RichValueMapper<? super K, ? super V, ? extends VR> mapper, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <KR> KStream<KR, V> toStream(final RichKeyValueMapper<? super K, ? super V, ? extends KR> mapper); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector); <KR, VR> KGroupedTable<KR, VR> groupBy(final RichKeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, final Serialized<KR, VR> serialized); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final RichValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final RichValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final ValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final RichValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final RichValueJoiner<? super K, ? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); |
KGroupedStream
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V>RichReducer<K, V> reducer); KTable<K, V> reduce(final RichReducer<K, RichReducer<V>V> reducer, final String queryableStoreNameMaterialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, V>VR> reduceaggregate(final RichReducer<V>Initializer<VR> reducerinitializer, final StateStoreSupplier<KeyValueStore> storeSupplier); <W extends Window> KTable<Windowed<K>, V> reduce(final RichAggregator<? RichReducer<V>super reducerK, ? super V, VR> aggregator, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final RichInitializer<K, VR> initializer, final Windows<W> windows, final Aggregator<? super K, ? super V, VR> aggregator, final String queryableStoreName); <W extends Window> KTable<Windowed<K>, V> reduce( final RichReducer<V>Materialized<K, reducerVR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final RichInitializer<K, VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows); <W extends Window> KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final RichInitializer<K, VR> initializer, final Windows<W> windows, final Aggregator<? super K, ? super V, VR> aggregator); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super finalV, StateStoreSupplier<WindowStore>VR> storeSupplieraggregator); KTable<Windowed<K><VR> KTable<K, V>VR> reduceaggregate(final RichReducer<V> reducerRichInitializer<K, VR> initializer, final finalRichAggregator<? SessionWindowssuper sessionWindowsK, ? super V, final String queryableStoreName); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows); KTable<Windowed<K>, V> reduce(final RichReducer<V> reducer, final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final String queryableStoreName); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, VR> aggregator); |
SessionWindowedKStream
There are 3 rich interfaces in aggregate() methods. So converting all possible combinations to their rich counterparts can cause a lot of overloads. So, I propose to overload one method with all rich interfaces.
Code Block | ||
---|---|---|
| ||
<T> KTable<Windowed<K>, T> aggregate(final RichInitializer<T> initializer, final StateStoreSupplier<KeyValueStore> storeSupplier); <W extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, T> aggregator, final RichAggregator<RichMerger<? super K, ? super VT> sessionMerger); <VR> KTable<Windowed<K>, VR> aggregatoraggregate(final RichInitializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final RichMerger<? super K, VR> sessionMerger, final Serde<VR> aggValueSerde, final Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized); KTable<Windowed<K>, V> reduce(final RichReducer<K, V> reducer); KTable<Windowed<K>, V> reduce(final RichReducer<K, V> reducer, final Materialized<K, V, final String queryableStoreNameSessionStore<Bytes, byte[]>> materializedAs); <W |
TimeWindowedKStream
Code Block | ||
---|---|---|
| ||
<VR> KTable<Windowed<K> extends Window, VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator); <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> aggregator, final RichAggregator<? super K, ? super V, VR> aggregator, final Windows<W> windows, final Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized); KTable<Windowed<K>, V> reduce(final RichReducer<K, V> reducer); KTable<Windowed<K>, V> reduce(final RichReducer<K, V> reducer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); |
KGroupedTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final Reducer<V> adder final StateStoreSupplier<WindowStore> storeSupplier); <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer, final RichReducer<K, V> subtractor, final RichAggregator<? super K, ?final superMaterialized<K, V, T> aggregatorKeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> reduce(final RichReducer<K, V> adder, final Merger<? super K, T> sessionMergerReducer<V> subtractor, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> reduce(final RichReducer<K, V> adder, final SessionWindows sessionWindows, final RichReducer<K, V> subtractor, final Serde<T>Materialized<K, aggValueSerdeV, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> reduce(final Reducer<V> adder, final finalRichReducer<K, StringV> queryableStoreNamesubtractor); <T> KTable<Windowed<K>KTable<K, T>V> aggregatereduce(final RichReducer<K, Initializer<T>V> initializeradder, final Reducer<V> subtractor); KTable<K, V> reduce(final RichReducer<K, V> adder, final RichAggregator<? super K, ? super V, T>final aggregatorRichReducer<K, V> subtractor); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Merger<Aggregator<? super K, T> sessionMerger, ? super V, VR> adder, final RichAggregator<? SessionWindowssuper sessionWindowsK, ? super V, VR> subtractor, final Materialized<K, VR, final Serde<T> aggValueSerdeKeyValueStore<Bytes, byte[]>> materialized); <T><VR> KTable<Windowed<K>KTable<K, T>VR> aggregate(final Initializer<T>Initializer<VR> initializer, final RichAggregator<? super K, ? super V, T>VR> aggregatoradder, final Aggregator<? final Merger<super K, ? super KV, T>VR> sessionMergersubtractor, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final SessionWindowsInitializer<VR> sessionWindowsinitializer, final RichAggregator<? super K, ? super V, VR> final Serde<T> aggValueSerdeadder, final RichAggregator<? super K, ? final StateStoreSupplier<SessionStore> storeSupplier); |
KGroupedTable
Code Block | ||
---|---|---|
| ||
KTable<K, V> reduce(final RichReducer<V> addersuper V, VR> subtractor, final Reducer<V> subtractor, final Materialized<K, VR, KeyValueStore<Bytes, final String queryableStoreName);byte[]>> materialized); <VR> KTable<K, V>VR> reduceaggregate(final Reducer<V>Initializer<VR> adderinitializer, final Aggregator<? RichReducer<V>super subtractorK, ? super V, VR> adder, final String queryableStoreName); KTable<K, V> reduce(final RichReducer<V> adder, final RichAggregator<? super K, ? super V, VR> subtractor); <VR> KTable<K, VR> aggregate(final RichReducer<V>Initializer<VR> subtractorinitializer, final String queryableStoreName); KTable<K, V> reduce(final RichReducer<V> adder, final RichAggregator<? super K, ? super V, VR> adder, final Reducer<V> subtractor); KTable<K, V> reduce(final Reducer<V> adder, final Aggregator<? super K, ? super finalV, RichReducer<V>VR> subtractor); <VR> KTable<K, V>VR> reduceaggregate(final RichReducer<V>Initializer<VR> adderinitializer, final RichReducer<V> subtractor); KTable<K, V> reduce(final RichReducer<V> adder, final RichAggregator<? super K, ? super V, VR> adder, final Reducer<V> subtractor, final RichAggregator<? super K, ? super finalV, StateStoreSupplier<KeyValueStore>VR> storeSuppliersubtractor); <VR> KTable<K, V>VR> reduceaggregate(final Reducer<V>Initializer<VR> adderinitializer, final RichReducer<V> subtractor, final Aggregator<? super K, ? super V, VR> adder, final StateStoreSupplier<KeyValueStore> storeSupplier); KTable<K, V> reduce(final RichReducer<V> adder, final RichAggregator<? super K, ? final RichReducer<V>super V, VR> subtractor, final StateStoreSupplier<KeyValueStore>Serde<VR> storeSupplieraggValueSerde); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final RichAggregator<Aggregator<? super K, ? super V, VR> subtractor, final StringSerde<VR> queryableStoreNameaggValueSerde); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adderSerde<VR> aggValueSerde); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<Aggregator<? super K, ? super V, VR> subtractoradder, final Serde<VR> aggValueSerde RichAggregator<? super K, ? super V, VR> subtractor, final StringStateStoreSupplier<KeyValueStore> queryableStoreNamestoreSupplier); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final RichAggregator<Aggregator<? super K, ? super V, VR> subtractor, final Serde<VR>StateStoreSupplier<KeyValueStore> aggValueSerdestoreSupplier); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final RichAggregator<? super K, ? super V, VR> adder, final RichAggregator<? super K, ? super V, VR> subtractor, final StateStoreSupplier<KeyValueStore> storeSupplier); |
Proposed changes
Make record context open to public
...
Thus, the record context is not available in ProcessorContext. We As a result, we make the following changes to make it "public"
...