THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Example with Current API:
Code Block |
---|
KTable<K, V1> table1 = builder.stream("topic1").groupByKey().aggregate(initializer1, aggregator1, aggValueSerde1, storeName1);
KTable<K, V2> table2 = builder.stream("topic2").groupByKey().aggregate(initializer2, aggregator2, aggValueSerde2, storeName2);
KTable<K, V3> table3 = builder.stream("topic3").groupByKey().aggregate(initializer3, aggregator3, aggValueSerde3, storeName3);
KTable<K, CG> cogrouped = table1.outerJoin(table2, joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree); |
...
Code Block |
---|
/** * {@code KCogroupedStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs. * It is an intermediate representation of one or more {@link KStream}s in order to apply one or more aggregation * operations on the original {@link KStream} records. * <p> * It is an intermediate representation after a grouping of {@link KStream}s, before the aggregations are applied to * the new partitions resulting in a {@link KTable}. * <p> * A {@code KCogroupedStream} must be obtained from a {@link KGroupedStream} via * {@link KGroupedStream#cogroup(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String) cogroup(...)}. * * @param <K> Type of keys * @param <RK> Type of key in the table, either K or Windowed<K> * @param <V> Type of aggregate values */ public interface KCogroupedStream<K, RK, V> { /** * @return this KCogroupedStream so you can chain calls */ <T> KCogroupedStream<K, RK, V> cogroup(KGroupedStream<K, T> groupedStream, Aggregator<? super K, ? super T, V> aggregator); KTable<RK, V> aggregate(); } |
Expected use:
Code Block |
---|
KTable<K, V> cogroupedTable = groupedStream1.cogroup(initializer, aggregator1, aggValueSerde, "aggValue").cogroup(groupedStream2, aggregator2).cogroup(groupedStream3, aggregator3) ... .cogroup(groupedStreamN, aggregatorN).aggregate(); |
...