Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
/**
* {@code CogroupedKStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.
* It is an intermediate representation of one or more {@link KStream}s in order to apply one or more aggregation
* operations on the original {@link KStream} records.
* <p>
* It is an intermediate representation after a grouping of {@link KStream}s, before the aggregations are applied to
* the new partitions resulting in a {@link KTable}.
* <p>
* A {@code CogroupedKStream} must be obtained from a {@link KGroupedStream} via 
* {@link KGroupedStream#cogroup(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String) cogroup(...)}.
*
* @param <K> Type of keys
* @param <V> Type of aggregate values
*/
public interface CogroupedKStream<K, Vout> {
    <Vin> CogroupedKStream<K, Vout> cogroup(final KGroupedStream<K, Vin> groupedStream,
                                       final Aggregator<? super K, ? super Vin, Vout> aggregator);

    KTable<K, Vout> aggregate(final Initializer<Vout> initializer,
                           final Materialized<K, Vout, KeyValueStore<Bytes, byte[]>> materialized);
    
    KTable<K, Vout> aggregate(final Initializer<Vout> initializer);

    <W extends Window> TimeWindowedCogroupedKStream<KKTable<K, Vout> windowedByaggregate(final Windows<W> windows);

Initializer<Vout> initializer,
						   final SessionWindowedCogroupedKStream<KNamed named,
  Vout> windowedBy(final SessionWindows sessionWindows);

}
Code Block
/**
 * {@code SessionWindowedCogroupKStream} is an abstraction of a <i>windowed</i> record stream of          final Materialized<K, Vout, KeyValueStore<Bytes, byte[]>> materialized);
    
    KTable<K, Vout> aggregate(final Initializer<Vout> initializer,
						   final Named named);

    <W extends Window> TimeWindowedCogroupedKStream<K, Vout> windowedBy(final Windows<W> windows);

    SessionWindowedCogroupedKStream<K, Vout> windowedBy(final SessionWindows sessionWindows);

}


Code Block
/**
 * {@code SessionWindowedCogroupKStream} is an abstraction of a <i>windowed</i> record stream of {@link org.apache.kafka.streams.KeyValue} pairs.
 * It is an intermediate representation of a {@link KGroupedStream} in order to apply a windowed aggregation operation on the original
 * {@link KGroupedStream} records.
 * <p>
 * It is an intermediate representation after a grouping, cogrouping and windowing of a {@link KStream} before an aggregation is applied to the
 * new (partitioned) windows resulting in a windowed {@link KTable}
 * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
 * <p>
 * The specified {@code SessionWindows} define the gap between windows.
 * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating
 * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
 *
 * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}).
 *
 * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
 * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.

 * A {@code WindowedKStream} must be obtained from{@link org.apache.kafka.streams.KeyValue} pairs.
 * It is an intermediate representation of a {@link KGroupedStream} in order to apply a windowed aggregation operation on the original
 * {@link KGroupedStream} records.
 * <p>
 * It is an intermediate representation after a grouping, cogrouping and windowing of a {@link KStream} before an aggregation is applied to the
 * new (partitioned) windows resulting in a windowed {@link KTable}
 * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
 * <p>
 * The specified {@code SessionWindows} define the gap between windows.
 * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating
 * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
 *
 * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}).
 *
 * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
 * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.

 * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
 *
 * @param <K> Type of keys
 * @param <V> Type of values
 * @see KStream
 * @see KGroupedStream
 * @see CogroupedKStream
 */
import org.apache.kafka.streams.state.SessionStore;

public interface SessionWindowedCogroupedKStream<K, V> {
    KTable<Windowed<K>, Vout> aggregate(final Initializer<V> initializer via {@link KGroupedStream#windowedBy(Windows)} .
 *
 * @param <K> Type of keys
 * @param <V> Type of values
 * @see KStream
 * @see KGroupedStream
 * @see CogroupedKStream
 */
import org.apache.kafka.streams.state.SessionStore;

public interface SessionWindowedCogroupedKStream<K, V> {
    KTable<Windowed<K>, Vout> aggregate(final Initializer<V> initializer,
                                     final Merger<? super K, V> sessionMerger,
                                     final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                     final Merger<? super K, V> sessionMerger);

    KTable<Windowed<K>, Vout> aggregate(final Initializer<V> initializer,
                                     final Merger<? super K, V> sessionMerger,
                                     final Merger<? super K, V> sessionMerger,
       Materialized<K, V, SessionStore<Bytes, byte[]>> materialized,
									 final Named named);

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                               final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);

 final Merger<? super KTable<Windowed<K>K, V> aggregate(sessionMerger,
									 final Initializer<V> Named named);

}


Code Block

/**initializer,
 * {@code TimeWindowedCogroupKStream} is an abstraction of a   <i>windowed</i> record stream of {@link org.apache.kafka.streams.KeyValue} pairs.
 * It is an intermediate representation of a {@link KGroupedStream} in order to apply a windowed aggregation operation on finalthe Merger<?original
 super* K, V> sessionMerger);

}
Code Block

/**{@link KGroupedStream} records.
 * <p>
 * {@codeIt TimeWindowedCogroupKStream} is an intermediate abstractionrepresentation ofafter a <i>windowed</i> record stream of grouping, cogrouping and windowing of a {@link org.apache.kafka.streams.KeyValue} pairs.KStream} before an aggregation is applied to the
 * Itnew is(partitioned) anwindows intermediateresulting representation ofin a windowed {@link KGroupedStreamKTable}
 in* order to apply a windowed aggregation operation on the original
 *(a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link KGroupedStreamWindowed Windowed<K>} records.
 * <p>
 * ItThe isspecified an intermediate representation after a grouping, cogrouping and windowing of a {@link KStream} before an aggregation is applied to the
 * new (partitioned) windows resulting in a windowed {@link KTable}
 * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
 * <p>
 * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
 * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
 * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating
 * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
 *
 * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}).
 *
 * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
 * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.

 * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
 *
 * @param <K> Type of keys
 * @param <T> Type of values
 * @see KStream
 * @see KGroupedStream
 * @see CogroupedKStream
 */

public interface TimeWindowedCogroupedKStream<K, V> {{@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
 * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
 * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating
 * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
 *
 * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}).
 *
 * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
 * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.

 * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
 *
 * @param <K> Type of keys
 * @param <T> Type of values
 * @see KStream
 * @see KGroupedStream
 * @see CogroupedKStream
 */

public interface TimeWindowedCogroupedKStream<K, V> {

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                     final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer);

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
									 final Named named,
                                     final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
									 final Named named);

}


Proposed Changes

  1. Construct the above Public Interfaces.
  2. Create an internal.KCogroupedStreamImpl that will keep track of the KeyValueSerde, AggValueSerde, Initializer, and Pairs of (KGroupedStream, Aggregator).
  3. Create an internal TimeWindowedKCogroupedStreamImpl and SessionWindowedKCogroupedStreamImpl that will complete the  windowed aggregations.
  4. make the KStreamAggProcessorSupplier for each KGroupedStream. Additionally ensure all sources are copartitioned, processors have access to the state store.
  5. create a cogroupedStreamAggregateBuilder to take the groupStream and aggregate pairs apply the aggregations

...

  • Users must upgrade to new version if they want to use this functionality.

Rejected Alternatives

...

  • Introducing Named in the cogroup

Rational for edits to the KIP

...