THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
/** * {@code CogroupedKStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs. * It is an intermediate representation of one or more {@link KStream}s in order to apply one or more aggregation * operations on the original {@link KStream} records. * <p> * It is an intermediate representation after a grouping of {@link KStream}s, before the aggregations are applied to * the new partitions resulting in a {@link KTable}. * <p> * A {@code CogroupedKStream} must be obtained from a {@link KGroupedStream} via * {@link KGroupedStream#cogroup(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String) cogroup(...)}. * * @param <K> Type of keys * @param <V> Type of aggregate values */ public interface CogroupedKStream<K, Vout> { <Vin> CogroupedKStream<K, Vout> cogroup(final KGroupedStream<K, Vin> groupedStream, final Aggregator<? super K, ? super Vin, Vout> aggregator); KTable<K, Vout> aggregate(final Initializer<Vout> initializer, final Materialized<K, Vout, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, Vout> aggregate(final Initializer<Vout> initializer); <W extends Window> TimeWindowedCogroupedKStream<KKTable<K, Vout> windowedByaggregate(final Windows<W> windows); Initializer<Vout> initializer, final SessionWindowedCogroupedKStream<KNamed named, Vout> windowedBy(final SessionWindows sessionWindows); } |
Code Block |
/** * {@code SessionWindowedCogroupKStream} is an abstraction of a <i>windowed</i> record stream of final Materialized<K, Vout, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, Vout> aggregate(final Initializer<Vout> initializer, final Named named); <W extends Window> TimeWindowedCogroupedKStream<K, Vout> windowedBy(final Windows<W> windows); SessionWindowedCogroupedKStream<K, Vout> windowedBy(final SessionWindows sessionWindows); } |
Code Block |
---|
/** * {@code SessionWindowedCogroupKStream} is an abstraction of a <i>windowed</i> record stream of {@link org.apache.kafka.streams.KeyValue} pairs. * It is an intermediate representation of a {@link KGroupedStream} in order to apply a windowed aggregation operation on the original * {@link KGroupedStream} records. * <p> * It is an intermediate representation after a grouping, cogrouping and windowing of a {@link KStream} before an aggregation is applied to the * new (partitioned) windows resulting in a windowed {@link KTable} * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}. * <p> * The specified {@code SessionWindows} define the gap between windows. * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating * materialized view) that can be queried using the name provided in the {@link Materialized} instance. * * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}). * * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. * A {@code WindowedKStream} must be obtained from{@link org.apache.kafka.streams.KeyValue} pairs. * It is an intermediate representation of a {@link KGroupedStream} in order to apply a windowed aggregation operation on the original * {@link KGroupedStream} records. * <p> * It is an intermediate representation after a grouping, cogrouping and windowing of a {@link KStream} before an aggregation is applied to the * new (partitioned) windows resulting in a windowed {@link KTable} * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}. * <p> * The specified {@code SessionWindows} define the gap between windows. * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating * materialized view) that can be queried using the name provided in the {@link Materialized} instance. * * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}). * * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} . * * @param <K> Type of keys * @param <V> Type of values * @see KStream * @see KGroupedStream * @see CogroupedKStream */ import org.apache.kafka.streams.state.SessionStore; public interface SessionWindowedCogroupedKStream<K, V> { KTable<Windowed<K>, Vout> aggregate(final Initializer<V> initializer via {@link KGroupedStream#windowedBy(Windows)} . * * @param <K> Type of keys * @param <V> Type of values * @see KStream * @see KGroupedStream * @see CogroupedKStream */ import org.apache.kafka.streams.state.SessionStore; public interface SessionWindowedCogroupedKStream<K, V> { KTable<Windowed<K>, Vout> aggregate(final Initializer<V> initializer, final Merger<? super K, V> sessionMerger, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized); KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer, final Merger<? super K, V> sessionMerger); KTable<Windowed<K>, Vout> aggregate(final Initializer<V> initializer, final Merger<? super K, V> sessionMerger, final Merger<? super K, V> sessionMerger, Materialized<K, V, SessionStore<Bytes, byte[]>> materialized, final Named named); KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized); final Merger<? super KTable<Windowed<K>K, V> aggregate(sessionMerger, final Initializer<V> Named named); } |
Code Block |
---|
/**initializer, * {@code TimeWindowedCogroupKStream} is an abstraction of a <i>windowed</i> record stream of {@link org.apache.kafka.streams.KeyValue} pairs. * It is an intermediate representation of a {@link KGroupedStream} in order to apply a windowed aggregation operation on finalthe Merger<?original super* K, V> sessionMerger); } |
Code Block |
/**{@link KGroupedStream} records. * <p> * {@codeIt TimeWindowedCogroupKStream} is an intermediate abstractionrepresentation ofafter a <i>windowed</i> record stream of grouping, cogrouping and windowing of a {@link org.apache.kafka.streams.KeyValue} pairs.KStream} before an aggregation is applied to the * Itnew is(partitioned) anwindows intermediateresulting representation ofin a windowed {@link KGroupedStreamKTable} in* order to apply a windowed aggregation operation on the original *(a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link KGroupedStreamWindowed Windowed<K>} records. * <p> * ItThe isspecified an intermediate representation after a grouping, cogrouping and windowing of a {@link KStream} before an aggregation is applied to the * new (partitioned) windows resulting in a windowed {@link KTable} * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}. * <p> * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f. * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}). * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating * materialized view) that can be queried using the name provided in the {@link Materialized} instance. * * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}). * * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} . * * @param <K> Type of keys * @param <T> Type of values * @see KStream * @see KGroupedStream * @see CogroupedKStream */ public interface TimeWindowedCogroupedKStream<K, V> {{@code windows} define either hopping time windows that can be overlapping or tumbling (c.f. * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}). * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating * materialized view) that can be queried using the name provided in the {@link Materialized} instance. * * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}). * * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID. * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} . * * @param <K> Type of keys * @param <T> Type of values * @see KStream * @see KGroupedStream * @see CogroupedKStream */ public interface TimeWindowedCogroupedKStream<K, V> { KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer); KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer, final Named named, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer, final Named named); } |
Proposed Changes
- Construct the above Public Interfaces.
- Create an internal.KCogroupedStreamImpl that will keep track of the KeyValueSerde, AggValueSerde, Initializer, and Pairs of (KGroupedStream, Aggregator).
- Create an internal TimeWindowedKCogroupedStreamImpl and SessionWindowedKCogroupedStreamImpl that will complete the windowed aggregations.
- make the KStreamAggProcessorSupplier for each KGroupedStream. Additionally ensure all sources are copartitioned, processors have access to the state store.
- create a cogroupedStreamAggregateBuilder to take the groupStream and aggregate pairs apply the aggregations
...
Users must upgrade to new version if they want to use this functionality.
Rejected Alternatives
...
- Introducing Named in the cogroup
Rational for edits to the KIP
...