Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
KGroupedStream {
...
	<T> CogroupedKStream<K, T> cogroup(final Aggregator<? super K, ? super V, T> aggregator);

	<T> CogroupedKStream<K, T> cogroup(final Aggregator<? super K, ? super V, T> aggregator, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
}


Code Block
/**
* {@code CogroupedKStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.
* It is an intermediate representation of one or more {@link KStream}s in order to apply one or more aggregation
* operations on the original {@link KStream} records.
* <p>
* It is an intermediate representation after a grouping of {@link KStream}s, before the aggregations are applied to
* the new partitions resulting in a {@link KTable}.
* <p>
* A {@code CogroupedKStream} must be obtained from a {@link KGroupedStream} via 
* {@link KGroupedStream#cogroup(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String) cogroup(...)}.
*
* @param <K> Type of keys
* @param <V> Type of aggregate values
*/
public interface CogroupedKStream<K, V> {
    <T> CogroupedKStream<K, V> cogroup(final KGroupedStream<K, T> groupedStream,
                                       final Aggregator<? super K, ? super T, V> aggregator);

    KTable<K, V> aggregate(final Initializer<V> initializer,
                           final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
    
    KTable<K, V> aggregate(final Initializer<V> initializer,);

    <W extends Window> TimeWindowedKCogroupedStream<K, V> windowedBy(final Windows<W> windows);

    SessionWindowedKCogroupedStream<K,            V> windowedBy(final StateStoreSupplier<KeyValueStore>SessionWindows storeSuppliersessionWindows);

	KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                     final Merger<? super K, V> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Materialized<K, VR, WindowedStore<Bytes, byte[]>> materialized);

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                     final Merger<? super K, V> sessionMerger,
                                     final SessionWindows sessionWindows,
                       }


Code Block
/**
 * {@code SessionWindowedCogroupKStream} is an abstraction of a <i>windowed</i> record stream of {@link org.apache.kafka.streams.KeyValue} pairs.
 * It is an intermediate representation of a {@link KGroupedStream} in order to apply a windowed aggregation operation on the original
 * {@link KGroupedStream} records.
 * <p>
 * It is an intermediate representation after a grouping, cogrouping and windowing of a {@link KStream} before an aggregation is applied to the
 * new (partitioned) windows resulting in a windowed {@link KTable}
 * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
 * <p>
 * The specified {@code SessionWindows} define the gap between windows.
 * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating
 * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
 *
 * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}).
 *
 * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
 * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.

 * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
 *
 * @param <K> Type of keys
 * @param <T> Type of values
 * @see KStream
 * @see KGroupedStream
 * @see KCogroupedStream
 */
import org.apache.kafka.streams.state.SessionStore;

public interface SessionWindowedKCogroupedStream<K, V> {
    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
              final StateStoreSupplier<SessionStore> storeSupplier);

    <W extends Window> KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
          final Merger<? super K, V> sessionMerger,
                                     final Materialized<K, V,  final Windows<W> windows,SessionStore<Bytes, byte[]>> materialized);

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                     final Merger<? super K,        final Materialized<K, VR, WindowedStore<Bytes, byte[]>> materialized);

    <W extends Window> KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                                        final Windows<W> windows,
                                                        final StateStoreSupplier<WindowStore>V> sessionMerger);

}


Code Block

/**
 * {@code TimeWindowedCogroupKStream} is an abstraction of a <i>windowed</i> record stream of {@link org.apache.kafka.streams.KeyValue} pairs.
 * It is an intermediate representation of a {@link KGroupedStream} in order to apply a windowed aggregation operation on the original
 * {@link KGroupedStream} records.
 * <p>
 * It is an intermediate representation after a grouping, cogrouping and windowing of a {@link KStream} before an aggregation is applied to the
 * new (partitioned) windows resulting in a windowed {@link KTable}
 * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
 * <p>
 * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
 * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
 * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating
 * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
 *
 * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}).
 *
 * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
 * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.

 * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
 *
 * @param <K> Type of keys
 * @param <T> Type of values
 * @see KStream
 * @see KGroupedStream
 * @see KCogroupedStream
 */

public interface TimeWindowedKCogroupedStream<K, V> {

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                     final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer);

}


Proposed Changes

  1. Construct the above Public Interfaces.
  2. Create an internal.KCogroupedStreamImpl that will keep track of the KeyValueSerde, AggValueSerde, Initializer, and Pairs of (KGroupedStream, Aggregator).
  3. Create an internal TimeWindowedKCogroupedStreamImpl and SessionWindowedKCogroupedStreamImpl that will complete the  windowed aggregations.
  4. make Model the aggregate method of internal.KCogroupedStream after the doAggregate method of KGroupedStream by forcing the KGroupedStreams to repartitionIfRequired and adding the KStreamAggProcessorSupplier for each KGroupedStream. Additionally ensure all sources are copartitioned, processors have access to the state store, and add a KStreamCogroup processor.all sources are copartitioned, processors have access to the state store.
  5. create a cogroupedStreamAggregateBuilder to take the groupStream and aggregate pairs apply the aggregationsCreate a KStreamCogroup that will passthrough all outputs from the KStreamAggregate. KStreamCogroup must also be a KStreamAggProcessorSupplier; it will keep track of all of its parent KStreamAggProcessorSuppliers in case it needs to enableSendingOldValues and it can have one of them create a KTableValueGetterSupplier if view is called.

Compatibility, Deprecation, and Migration Plan

...