Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1.  Reduce the number of gets from state stores. With the multiple joins when a new value comes into any of the streams a chain reaction happens where ValueGetters keep calling ValueGetters until we have accessed all state stores.
  2. Slight performance increase. As described above all ValueGetters are called also causing all ValueJoiners to be called forcing a recalculation of the current joined value of all other streams, impacting performance.

 

Example with Current API:

KTable<K, V1> table1 = builder.stream("topic1").groupByKey().aggregate(initializer1, aggregator1, aggValueSerde1, storeName1);
KTable<K, V2> table2 = builder.stream("topic2").groupByKey().aggregate(initializer2, aggregator2, aggValueSerde2, storeName2);
KTable<K, V3> table3 = builder.stream("topic3").groupByKey().aggregate(initializer3, aggregator3, aggValueSerde3, storeName3);
KTable<K, CG> cogrouped = table1.outerJoin(table2, joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);

As you can see this creates 3 StateStores, requires 3 initializers, and 3 aggValueSerdes. This also adds the pressure to user to define what the intermediate values are going to be (V1, V2, V3). They are left with a couple choices, first to make V1, V2, and V3 all the same as CG and the two joiners are more like mergers, or second make them intermediate states such as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to build the final aggregate CG value. This is something the user could avoid thinking about with this KIP.

When a new input arrives lets say at "topic1" it will first go through a KStreamAggregate grabbing the current aggregate from storeName1. It will produce this in the form of the first intermediate value and get ent through a KTableKTableOuterJoin where it will look up the current value of the key in storeName2. It will use the first joiner to calculate this, the second intermediate value, which will go through an additional KTableKTableOuterJoin. Here it will look up the current value of the key in storeName3 and use the second joiner to build the final aggregate value.

If you think through all possibilities for incoming topics you will see that no matter which topic it comes in through all three stores are queried and all of the joiners must get used.

Topology wise for N incoming streams the best case scenario this creates N KStreamAggregates, 2 * Log2N KTableKTableOuterJoins, and Log2N KTableKTableJoinMergers. This is only if the user understand the best case scenario is building a binary tree of joins. Otherwise worse case could be could be 2*(N-1) KTableKTableOuterJoins, and N-1 KTableKTableJoinMergers.

 

Example with Proposed API:

KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey();
KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey();
KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey();
KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, aggregator1, aggValueSerde1, storeName1)
        .cogroup(grouped2, aggregator2)
        .cogroup(grouped3, aggregator3)
        .aggregate();

As you can see this creates 1 StateStore, requires 1 initializer, and 1 aggValueSerde. The user no longer has to worry about the intermediate values and the joiners. All they have to think about is how each stream impacts the creation of the final CG object.

When a new input arrives lets say at "topic1" it will first go through a KStreamAggreagte and grab the current aggregate from storeName1. It will add its incoming object to the aggregate, update the store and pass the new aggregate on. This new aggregate goes through the KStreamCogroup which is pretty much just a pass through processor and you are done. 

Topology wise for N incoming streams the new api will only every create N KStreamAggregates and 1 KStreamCogroup.

Public Interfaces

KGroupedStream { //Copy of aggregate method signatures.
...
<T> KCogroupedStream<K, K, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Serde<T> aggValueSerde, final String storeName);

...