Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
KGroupedStream<Long, Item> groupedCart = builder.stream("cart").groupByKey();
KGroupedStream<Long, Item> groupedPurchases = builder.stream("purchases").groupByKey();
KGroupedStream<Long, Item> groupedWishList = builder.stream("wish-list").groupByKey();
KTable<Long, Customer> customers = groupedCart.cogroup(Customer::new, CART_AGGREGATOR, materialized)
        .cogroup(groupedPurchases, PURCHASE_AGGREGATOR)
        .cogroup(groupedWishList, WISH_LIST_AGGREGATOR)
        .aggregate(initializer);
customers.to("customers");

...