Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces

Code Block
KGroupedStream <K, V> {
...
	<T> CogroupedKStream<K, T> cogroup(final Aggregator<? super K, ? super V, T> aggregator);
}

...

Code Block
/**
* {@code CogroupedKStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.
* It is an intermediate representation of one or more {@link KStream}s in order to apply one or more aggregation
* operations on the original {@link KStream} records.
* <p>
* It is an intermediate representation after a grouping of {@link KStream}s, before the aggregations are applied to
* the new partitions resulting in a {@link KTable}.
* <p>
* A {@code CogroupedKStream} must be obtained from a {@link KGroupedStream} via 
* {@link KGroupedStream#cogroup(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String) cogroup(...)}.
*
* @param <K> Type of keys
* @param <V> Type of aggregate values
*/
public interface CogroupedKStream<K, Vout> {
    <Vin> CogroupedKStream<K, Vout> cogroup(final KGroupedStream<K, Vin> groupedStream,
                                       final Aggregator<? super K, ? super Vin, Vout> aggregator);

    KTable<K, Vout> aggregate(final Initializer<Vout> initializer,
                           final Materialized<K, Vout, KeyValueStore<Bytes, byte[]>> materialized);
    
    KTable<K, Vout> aggregate(final Initializer<Vout> initializer);

    <W extends Window> TimeWindowedKCogroupedStream<KTimeWindowedCogroupedKStream<K, Vout> windowedBy(final Windows<W> windows);

    SessionWindowedKCogroupedStream<KSessionWindowedCogroupedKStream<K, Vout> windowedBy(final SessionWindows sessionWindows);

}

...

Code Block
/**
 * {@code SessionWindowedCogroupKStream} is an abstraction of a <i>windowed</i> record stream of {@link org.apache.kafka.streams.KeyValue} pairs.
 * It is an intermediate representation of a {@link KGroupedStream} in order to apply a windowed aggregation operation on the original
 * {@link KGroupedStream} records.
 * <p>
 * It is an intermediate representation after a grouping, cogrouping and windowing of a {@link KStream} before an aggregation is applied to the
 * new (partitioned) windows resulting in a windowed {@link KTable}
 * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
 * <p>
 * The specified {@code SessionWindows} define the gap between windows.
 * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating
 * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
 *
 * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}).
 *
 * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
 * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.

 * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
 *
 * @param <K> Type of keys
 * @param <V> Type of values
 * @see KStream
 * @see KGroupedStream
 * @see KCogroupedStreamCogroupedKStream
 */
import org.apache.kafka.streams.state.SessionStore;

public interface SessionWindowedKCogroupedStream<KSessionWindowedCogroupedKStream<K, V> {
    KTable<Windowed<K>, Vout> aggregate(final Initializer<V> initializer,
                                     final Merger<? super K, V> sessionMerger,
                                     final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                     final Merger<? super K, V> sessionMerger);

}

...

Code Block
/**
 * {@code TimeWindowedCogroupKStream} is an abstraction of a <i>windowed</i> record stream of {@link org.apache.kafka.streams.KeyValue} pairs.
 * It is an intermediate representation of a {@link KGroupedStream} in order to apply a windowed aggregation operation on the original
 * {@link KGroupedStream} records.
 * <p>
 * It is an intermediate representation after a grouping, cogrouping and windowing of a {@link KStream} before an aggregation is applied to the
 * new (partitioned) windows resulting in a windowed {@link KTable}
 * (a <emph>windowed</emph> {@code KTable} is a {@link KTable} with key type {@link Windowed Windowed<K>}.
 * <p>
 * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
 * {@link TimeWindows}) or they define landmark windows (c.f. {@link UnlimitedWindows}).
 * The result is written into a local windowed {@link org.apache.kafka.streams.state.KeyValueStore} (which is basically an ever-updating
 * materialized view) that can be queried using the name provided in the {@link Materialized} instance.
 *
 * New events are added to windows until their grace period ends (see {@link TimeWindows#grace(Duration)}).
 *
 * Furthermore, updates to the store are sent downstream into a windowed {@link KTable} changelog stream, where
 * "windowed" implies that the {@link KTable} key is a combined key of the original record key and a window ID.

 * A {@code WindowedKStream} must be obtained from a {@link KGroupedStream} via {@link KGroupedStream#windowedBy(Windows)} .
 *
 * @param <K> Type of keys
 * @param <T> Type of values
 * @see KStream
 * @see KGroupedStream
 * @see KCogroupedStreamCogroupedKStream
 */

public interface TimeWindowedKCogroupedStream<KTimeWindowedCogroupedKStream<K, V> {

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                     final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer);

}

...