Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Example with Current API:

Code Block

KTable<K, V1> table1 = builder.stream("topic1").groupByKey().aggregate(initializer1, aggregator1, aggValueSerde1, storeName1);
KTable<K, V2> table2 = builder.stream("topic2").groupByKey().aggregate(initializer2, aggregator2, aggValueSerde2, storeName2);
KTable<K, V3> table3 = builder.stream("topic3").groupByKey().aggregate(initializer3, aggregator3, aggValueSerde3, storeName3);
KTable<K, CG> cogrouped = table1.outerJoin(table2, joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);

...

Code Block
/**
* {@code KCogroupedStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.
* It is an intermediate representation of one or more {@link KStream}s in order to apply one or more aggregation
* operations on the original {@link KStream} records.
* <p>
* It is an intermediate representation after a grouping of {@link KStream}s, before the aggregations are applied to
* the new partitions resulting in a {@link KTable}.
* <p>
* A {@code KCogroupedStream} must be obtained from a {@link KGroupedStream} via 
* {@link KGroupedStream#cogroup(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String) cogroup(...)}.
*
* @param <K> Type of keys
* @param <RK> Type of key in the table, either K or Windowed&ltK&gt
* @param <V> Type of aggregate values
*/
public interface KCogroupedStream<K, RK, V> {
/**
* @return this KCogroupedStream so you can chain calls
*/
<T> KCogroupedStream<K, RK, V> cogroup(KGroupedStream<K, T> groupedStream, Aggregator<? super K, ? super T, V> aggregator);
KTable<RK, V> aggregate();
}

Expected use:

Code Block

KTable<K, V> cogroupedTable = groupedStream1.cogroup(initializer, aggregator1, aggValueSerde, "aggValue").cogroup(groupedStream2, aggregator2).cogroup(groupedStream3, aggregator3) ... .cogroup(groupedStreamN, aggregatorN).aggregate();

...