Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Topology wise for N incoming streams the new api will only every create N KStreamAggregates and 1 KStreamCogroup.

 

Concrete Example:

public class Customer {

    List<Item> cart;

    List<Item> purchases;

    List<Item> wishList;

}

There are 3 streams: cart, purchases, and wish-list.

We would construct 3 aggregators in which we add the item to the appropriate list. One of these would look like:

private static final Aggregator<String, Item, Customer> CART_AGGREGATOR = new Aggregator<String, Item, Customer>() {
    @Override
    public Patient apply(String key, Item value, Customer aggregate) {
        aggregate.cart.add(value);
        return aggregate;
    }
};

Then we would create the topology:

KGroupedStream<Long, Item> groupedCart = builder.stream("cart").groupByKey();
KGroupedStream<Long, Item> groupedPurchases = builder.stream("purchases").groupByKey();
KGroupedStream<Long, Item> groupedWishList = builder.stream("wish-list").groupByKey();
KTable<Long, Customer> customers = groupedCart.cogroup(Customer::new, CART_AGGREGATOR, aggValueSerde, "customerStore")
        .cogroup(groupedPurchases, PURCHASE_AGGREGATOR)
        .cogroup(groupedWishList, WISH_LIST_AGGREGATOR)
        .aggregate();
customers.to("customers");

Now imagine the streams get the following values:

Stream "cart":

    1L, Item[no:01]
    2L, Item[no:02]
    1L, Item[no:03]
    1L, Item[no:04]
    2L, Item[no:05]

Stream "purchases":

    2L, Item[no:06]
    1L, Item[no:07]
    1L, Item[no:08]
    2L, Item[no:09]
    2L, Item[no:10]

Stream "wish-list":

    1L, Item[no:11]
    2L, Item[no:12]
    2L, Item[no:13]
    2L, Item[no:14]
    2L, Item[no:15]

 

You could expect to see the following outputs in "customers":

1L, Customer[
                      cart:{Item[no:01], Item[no:03], Item[no:04]},
                      purchases:{Item[no:07], Item[no:08]},
                      wishList:{Item[no:11]}
      ]
2L, Customer[
                      cart:{Item[no:02], Item[no:05]},
                      purchases:{Item[no:06], Item[no:09], Item[no:10]},
                      wishList:{Item[no:12], Item[no:13], Item[no:14], Item[no:15]}
      ]

Public Interfaces

KGroupedStream { //Copy of aggregate method signatures.
...
<T> KCogroupedStream<K, K, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Serde<T> aggValueSerde, final String storeName);

...