Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Example with Current API:

Code Block

KTable<K, V1> table1 ="topic1").groupByKey().aggregate(initializer1, aggregator1, aggValueSerde1, storeName1);


KTable<K, V2> table2 ="topic2").groupByKey().aggregate(initializer2, aggregator2, aggValueSerde2, storeName2);


KTable<K, V3> table3 ="topic3").groupByKey().aggregate(initializer3, aggregator3, aggValueSerde3, storeName3);


KTable<K, CG> cogrouped = table1.outerJoin(table2, joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);


As you can see this creates 3 StateStores, requires 3 initializers, and 3 aggValueSerdes. This also adds the pressure to user to define what the intermediate values are going to be (V1, V2, V3). They are left with a couple choices, first to make V1, V2, and V3 all the same as CG and the two joiners are more like mergers, or second make them intermediate states such as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to build the final aggregate CG value. This is something the user could avoid thinking about with this KIP.


Example with Proposed API:

Code Block
KGroupedStream<K, V1> grouped1 ="topic1").groupByKey();


KGroupedStream<K, V2> grouped2 ="topic2").groupByKey();


KGroupedStream<K, V3> grouped3 ="topic3").groupByKey();


KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, aggregator1, aggValueSerde1, storeName1)


        .cogroup(grouped2, aggregator2)


        .cogroup(grouped3, aggregator3)



As you can see this creates 1 StateStore, requires 1 initializer, and 1 aggValueSerde. The user no longer has to worry about the intermediate values and the joiners. All they have to think about is how each stream impacts the creation of the final CG object.


Topology wise for N incoming streams the new api will only every create N KStreamAggregates and 1 KStreamCogroup.


Concrete Example:

Code Block
public class


    List<Item> cart;

    List<Item> purchases;

    List<Item> wishList;

 Customer {
    List<Item> cart;
    List<Item> purchases;
    List<Item> wishList;

There are 3 streams: cart, purchases, and wish-list.

We would construct 3 aggregators in which we add the item to the appropriate list. One of these would look like:

Code Block
private static final Aggregator<String, Item, Customer> CART_AGGREGATOR = new Aggregator<String, Item, Customer>()


    public Patient apply(String key, Item value, Customer aggregate)




        return aggregate;

Then we would create the topology:

Code Block
KGroupedStream<Long, Item> groupedCart ="cart").groupByKey();


KGroupedStream<Long, Item> groupedPurchases ="purchases").groupByKey();


KGroupedStream<Long, Item> groupedWishList ="wish-list").groupByKey();


KTable<Long, Customer> customers = groupedCart.cogroup(Customer::new, CART_AGGREGATOR, aggValueSerde, "customerStore")


        .cogroup(groupedPurchases, PURCHASE_AGGREGATOR)


        .cogroup(groupedWishList, WISH_LIST_AGGREGATOR)




Now imagine the streams get the following values:


    1L, Item[no:11]
    2L, Item[no:12]
    2L, Item[no:13]
    2L, Item[no:14]
    2L, Item[no:15]


You After all items have flown through the topology, you could expect to see the following outputs in "customers":

1L, Customer[

                      cart:{Item[no:01], Item[no:03], Item[no:04]},
                      purchases:{Item[no:07], Item[no:08]},
2L, Customer[
                      cart:{Item[no:02], Item[no:05]},
                      purchases:{Item[no:06], Item[no:09], Item[no:10]},
                      wishList:{Item[no:12], Item[no:13], Item[no:14], Item[no:15]}


It is important to note that intermediate values would also be produced, unless they are processed closely enough together that caching prevents this. (eg. After first item is processed from "cart" stream customer 1L would be output with only that first item in its cart and no items in the purchases or wishlist.)

Public Interfaces

Code Block
KGroupedStream { //Copy of aggregate method signatures.






 KCogroupedStream<K, K, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Serde<T> aggValueSerde, final String storeName);




 KCogroupedStream<K, K, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier);




 KCogroupedStream<K, Windowed<K>, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final String storeName);




 KCogroupedStream<K, Windowed<K>, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier);


<W extends Window, T>


 KCogroupedStream<K, Windowed<K>, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Windows<W> windows, final Serde<T> aggValueSerde, final String storeName);


<W extends Window, T>


 KCogroupedStream<K, Windowed<K>, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier);




Code Block


* {@code KCogroupedStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.


* It is an intermediate representation of one or more {@link KStream}s in order to apply one or more aggregation


* operations on the original {@link KStream} records.


* <p>


* It is an intermediate representation after a grouping of {@link KStream}s, before the aggregations are applied to


* the new partitions resulting in a {@link KTable}.


* <p>


* A {@code KCogroupedStream} must be obtained from a {@link KGroupedStream} via


* {@link KGroupedStream#cogroup(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String) cogroup(...)}.




* @param <K> Type of keys


* @param <RK> Type of key in the table, either K or Windowed&ltK&gt


* @param <V> Type of aggregate values




public interface KCogroupedStream<K, RK, V> {




* @return this KCogroupedStream so you can chain calls






 KCogroupedStream<K, RK, V> cogroup(KGroupedStream<K, T> groupedStream, Aggregator<? super K, ? super T, V> aggregator);


KTable<RK, V> aggregate();



Expected use:

Code Block

KTable<K, V> cogroupedTable = groupedStream1.cogroup(initializer, aggregator1, aggValueSerde, "aggValue").cogroup(groupedStream2, aggregator2).cogroup(groupedStream3, aggregator3) ...


 .cogroup(groupedStreamN, aggregatorN).aggregate();


Proposed Changes

  1. Construct the above Public Interfaces.
  2. Create an internal.KCogroupedStreamImpl that will keep track of the StateStoreSupplier, Initializer, Pairs of (KGroupedStream, Aggregator), and if needed Windows or SessionMerger and SessionWindows.
  3. Model the aggregate method of internal.KCogroupedStream after the doAggregate method of KGroupedStream by forcing the KGroupedStreams to repartitionIfRequired and adding the KStreamAggProcessorSupplier for each KGroupedStream. Additionally ensure all sources are copartitioned, processors have access to the state store, and add a KStreamCogroup processor.
  4. Create a KStreamCogroup that will passthrough all outputs from the KStreamAggregate. KStreamCogroup must also be a KStreamAggProcessorSupplier; it will keep track of all of its parent KStreamAggProcessorSuppliers in case it needs to enableSendingOldValues and it can have one of them create a KTableValueGetterSupplier if view is called.
