...
Topology wise for N incoming streams the new api will only every create N KStreamAggregates and 1 KStreamCogroup.
Concrete Example:
public class Customer {
List<Item> cart;
List<Item> purchases;
List<Item> wishList;
}
There are 3 streams: cart, purchases, and wish-list.
We would construct 3 aggregators in which we add the item to the appropriate list. One of these would look like:
private static final Aggregator<String, Item, Customer> CART_AGGREGATOR = new Aggregator<String, Item, Customer>() {
@Override
public Patient apply(String key, Item value, Customer aggregate) {
aggregate.cart.add(value);
return aggregate;
}
};
Then we would create the topology:
KGroupedStream<Long, Item> groupedCart = builder.stream("cart").groupByKey();
KGroupedStream<Long, Item> groupedPurchases = builder.stream("purchases").groupByKey();
KGroupedStream<Long, Item> groupedWishList = builder.stream("wish-list").groupByKey();
KTable<Long, Customer> customers = groupedCart.cogroup(Customer::new, CART_AGGREGATOR, aggValueSerde, "customerStore")
.cogroup(groupedPurchases, PURCHASE_AGGREGATOR)
.cogroup(groupedWishList, WISH_LIST_AGGREGATOR)
.aggregate();
customers.to("customers");
Now imagine the streams get the following values:
Stream "cart":
1L, Item[no:01]
2L, Item[no:02]
1L, Item[no:03]
1L, Item[no:04]
2L, Item[no:05]
Stream "purchases":
2L, Item[no:06]
1L, Item[no:07]
1L, Item[no:08]
2L, Item[no:09]
2L, Item[no:10]
Stream "wish-list":
1L, Item[no:11]
2L, Item[no:12]
2L, Item[no:13]
2L, Item[no:14]
2L, Item[no:15]
You could expect to see the following outputs in "customers":
1L, Customer[
cart:{Item[no:01], Item[no:03], Item[no:04]},
purchases:{Item[no:07], Item[no:08]},
wishList:{Item[no:11]}
]
2L, Customer[
cart:{Item[no:02], Item[no:05]},
purchases:{Item[no:06], Item[no:09], Item[no:10]},
wishList:{Item[no:12], Item[no:13], Item[no:14], Item[no:15]}
]
Public Interfaces
KGroupedStream { //Copy of aggregate method signatures.
...
<T> KCogroupedStream<K, K, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Serde<T> aggValueSerde, final String storeName);
...