Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: changed the storeName and serde to be in a Materialized object

...

  1.  Reduce the number of gets from state stores. With the multiple joins when a new value comes into any of the streams a chain reaction happens where ValueGetters keep calling ValueGetters until we have accessed all state stores.
  2. Slight performance increase. As described above all ValueGetters are called also causing all ValueJoiners to be called forcing a recalculation of the current joined value of all other streams, impacting performance.

 


Example with Current API:

Code Block
KTable<K, V1> table1 = builder.stream("topic1").groupByKey().aggregate(initializer1, aggregator1, aggValueSerde1, storeName1materialized1);
KTable<K, V2> table2 = builder.stream("topic2").groupByKey().aggregate(initializer2, aggregator2, aggValueSerde2, storeName2materialized2);
KTable<K, V3> table3 = builder.stream("topic3").groupByKey().aggregate(initializer3, aggregator3, aggValueSerde3, storeName3materialized3);
KTable<K, CG> cogrouped = table1.outerJoin(table2, joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);

...


As you can see this creates 3 StateStores, and in the Materialized parameter requires 3 initializers, and 3 aggValueSerdes. This also adds the pressure to user to define what the intermediate values are going to be (V1, V2, V3). They are left with a couple choices, first to make V1, V2, and V3 all the same as CG and the two joiners are more like mergers, or second make them intermediate states such as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to build the final aggregate CG value. This is something the user could avoid thinking about with this KIP.

...

Topology wise for N incoming streams this creates N KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1 KTableKTableJoinMergers. 


Example with Proposed API:

Code Block
KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey();
KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey();
KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey();
KTable<K, CG> cogrouped = grouped1.cogroup(aggregator1)
        .cogroup(grouped2, aggregator2)
        .cogroup(grouped3, aggregator3)
        .aggregate(initializer1, aggValueSerde1, storeName1materialized1);

As you can see this creates 1 StateStore, and in the Materialized parameter requires 1 initializer, and 1 aggValueSerde. The user no longer has to worry about the intermediate values and the joiners. All they have to think about is how each stream impacts the creation of the final CG object.

...

Topology wise for N incoming streams the new api will only every create N KStreamAggregates and 1 KStreamCogroup. 


Concrete Example:

Code Block
public class Customer {
    List<Item> cart;
    List<Item> purchases;
    List<Item> wishList;
}

...

Code Block
KGroupedStream<Long, Item> groupedCart = builder.stream("cart").groupByKey();
KGroupedStream<Long, Item> groupedPurchases = builder.stream("purchases").groupByKey();
KGroupedStream<Long, Item> groupedWishList = builder.stream("wish-list").groupByKey();
KTable<Long, Customer> customers = groupedCart.cogroup(Customer::new, CART_AGGREGATOR, aggValueSerde, "customerStore"materialized)
        .cogroup(groupedPurchases, PURCHASE_AGGREGATOR)
        .cogroup(groupedWishList, WISH_LIST_AGGREGATOR)
        .aggregate();
customers.to("customers");

...

    1L, Item[no:11]
    2L, Item[no:12]
    2L, Item[no:13]
    2L, Item[no:14]
    2L, Item[no:15] 


After all items have flown through the topology, you could expect to see the following outputs in "customers":

...

Code Block
KGroupedStream {
...
	<T> CogroupedKStream<K, T> cogroup(final Aggregator<? super K, ? super V, T> aggregator);
}
	<T> CogroupedKStream<K, T> cogroup(final Aggregator<? super K, ? super V, T> aggregator, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
}


Code Block
/**
* {@code CogroupedKStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.
* It is an intermediate representation of one or more {@link KStream}s in order to apply one or more aggregation
* operations on the original {@link KStream} records.
* <p>
* It is an intermediate representation after a grouping of {@link KStream}s, before the aggregations are applied to
* the new partitions resulting in a {@link KTable}.
* <p>
* A {@code CogroupedKStream} must be obtained from a {@link KGroupedStream} via 
* {@link KGroupedStream#cogroup(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String) cogroup(...)}.
*
* @param <K> Type of keys
* @param <V> Type of aggregate values
*/
public interface CogroupedKStream<K, V> {
    <T> CogroupedKStream<K, V> cogroup(final KGroupedStream<K, T> groupedStream,
                  
Code Block
/**
* {@code CogroupedKStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.
* It is an intermediate representation of one or more {@link KStream}s in order to apply one or more aggregation
* operations on the original {@link KStream} records.
* <p>
* It is an intermediate representation after a grouping of {@link KStream}s, before the aggregations are applied to
* the new partitions resulting in a {@link KTable}.
* <p>
* A {@code CogroupedKStream} must be obtained from a {@link KGroupedStream} via 
* {@link KGroupedStream#cogroup(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String) cogroup(...)}.
*
* @param <K> Type of keys
* @param <V> Type of aggregate values
*/
public interface CogroupedKStream<K, V> {
    <T> CogroupedKStream<K, V> cogroup(final KGroupedStream<K, T> groupedStream,
                                       final Aggregator<? super K, ? super T, V> aggregator);

    KTable<K, V> aggregate(final Initializer<V> initializer,
                           final Serde<V> valueSerde,
                           final String storeName);
    
    KTable<K, V> aggregate(final Initializer<V> initializer,
                     final Aggregator<? super K, ? super finalT, StateStoreSupplier<KeyValueStore>V> storeSupplieraggregator);

    KTable<Windowed<K>KTable<K, V> aggregate(final Initializer<V> initializer,
                           final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
    
 final Merger<? super KKTable<K, V> sessionMerger aggregate(final Initializer<V> initializer,
                           final StateStoreSupplier<KeyValueStore> storeSupplier);

    KTable<Windowed<K>, V>   aggregate(final SessionWindowsInitializer<V> sessionWindowsinitializer,
                                     final Serde<V> valueSerde,
  Merger<? super K, V> sessionMerger,
                                   final String storeName);

    KTable<Windowed<K>, V> aggregate(final Initializer<V>SessionWindows initializersessionWindows,
                                     final Merger<? super K, V> sessionMerger,
   final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                     final SessionWindows sessionWindows,
              final Merger<? super K, V> sessionMerger,
                  final StateStoreSupplier<SessionStore> storeSupplier);

    <W extends Window> KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
      final SessionWindows sessionWindows,
                                     final StateStoreSupplier<SessionStore> storeSupplier);

    <W extends Window> KTable<Windowed<K>, V> aggregate(final Windows<W>Initializer<V> windowsinitializer,
                                                        final Serde<V>Windows<W> valueSerdewindows,
                                                        final String storeName Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

    <W extends Window> KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                                        final Windows<W> windows,
                                                        final StateStoreSupplier<WindowStore> storeSupplier);
}

...


Proposed Changes

  1. Construct the above Public Interfaces.
  2. Create an internal.KCogroupedStreamImpl that will keep track of the KeyValueSerde, AggValueSerde, Initializer, and Pairs of (KGroupedStream, Aggregator).
  3. Model the aggregate method of internal.KCogroupedStream after the doAggregate method of KGroupedStream by forcing the KGroupedStreams to repartitionIfRequired and adding the KStreamAggProcessorSupplier for each KGroupedStream. Additionally ensure all sources are copartitioned, processors have access to the state store, and add a KStreamCogroup processor.
  4. Create a KStreamCogroup that will passthrough all outputs from the KStreamAggregate. KStreamCogroup must also be a KStreamAggProcessorSupplier; it will keep track of all of its parent KStreamAggProcessorSuppliers in case it needs to enableSendingOldValues and it can have one of them create a KTableValueGetterSupplier if view is called.

...