Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Topology wise for N incoming streams the new api will only every create N KStreamAggregates and 1 KStreamCogroup.


Concrete Example:

public class Customer {

    List<Item> cart;

    List<Item> purchases;

    List<Item> wishList;


There are 3 streams: cart, purchases, and wish-list.

We would construct 3 aggregators in which we add the item to the appropriate list. One of these would look like:

private static final Aggregator<String, Item, Customer> CART_AGGREGATOR = new Aggregator<String, Item, Customer>() {
    public Patient apply(String key, Item value, Customer aggregate) {
        return aggregate;

Then we would create the topology:

KGroupedStream<Long, Item> groupedCart ="cart").groupByKey();
KGroupedStream<Long, Item> groupedPurchases ="purchases").groupByKey();
KGroupedStream<Long, Item> groupedWishList ="wish-list").groupByKey();
KTable<Long, Customer> customers = groupedCart.cogroup(Customer::new, CART_AGGREGATOR, aggValueSerde, "customerStore")
        .cogroup(groupedPurchases, PURCHASE_AGGREGATOR)
        .cogroup(groupedWishList, WISH_LIST_AGGREGATOR)

Now imagine the streams get the following values:

Stream "cart":

    1L, Item[no:01]
    2L, Item[no:02]
    1L, Item[no:03]
    1L, Item[no:04]
    2L, Item[no:05]

Stream "purchases":

    2L, Item[no:06]
    1L, Item[no:07]
    1L, Item[no:08]
    2L, Item[no:09]
    2L, Item[no:10]

Stream "wish-list":

    1L, Item[no:11]
    2L, Item[no:12]
    2L, Item[no:13]
    2L, Item[no:14]
    2L, Item[no:15]


You could expect to see the following outputs in "customers":

1L, Customer[
                      cart:{Item[no:01], Item[no:03], Item[no:04]},
                      purchases:{Item[no:07], Item[no:08]},
2L, Customer[
                      cart:{Item[no:02], Item[no:05]},
                      purchases:{Item[no:06], Item[no:09], Item[no:10]},
                      wishList:{Item[no:12], Item[no:13], Item[no:14], Item[no:15]}

Public Interfaces

KGroupedStream { //Copy of aggregate method signatures.
<T> KCogroupedStream<K, K, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Serde<T> aggValueSerde, final String storeName);
