THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
KGroupedStream<Long, Item> groupedCart = builder.stream("cart").groupByKey(); KGroupedStream<Long, Item> groupedPurchases = builder.stream("purchases").groupByKey(); KGroupedStream<Long, Item> groupedWishList = builder.stream("wish-list").groupByKey(); KTable<Long, Customer> customers = groupedCart.cogroup(Customer::new, CART_AGGREGATOR, materialized) .cogroup(groupedPurchases, PURCHASE_AGGREGATOR) .cogroup(groupedWishList, WISH_LIST_AGGREGATOR) .aggregate(initializer); customers.to("customers"); |
...