Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
KGroupedStream<K, V1> grouped1 = builder.stream("topic1").groupByKey();
KGroupedStream<K, V2> grouped2 = builder.stream("topic2").groupByKey();
KGroupedStream<K, V3> grouped3 = builder.stream("topic3").groupByKey();
KTable<K, CG> cogrouped = grouped1.cogroup(initializer1, aggregator1, aggValueSerde1)
        .cogroup(grouped2, aggregator2)
        .cogroup(grouped3, aggregator3)
        .aggregate(initializer1, aggValueSerde1, storeName1);

As you can see this creates 1 StateStore, requires 1 initializer, and 1 aggValueSerde. The user no longer has to worry about the intermediate values and the joiners. All they have to think about is how each stream impacts the creation of the final CG object.

...

Code Block
KGroupedStream { //Copy of aggregate method signatures.
...
	<T> CogroupedKStream<K, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Serde<T> aggValueSerde);
}
Code Block
/**
* {@code CogroupedKStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.
* It is an intermediate representation of one or more {@link KStream}s in order to apply one or more aggregation
* operations on the original {@link KStream} records.
* <p>
* It is an intermediate representation after a grouping of {@link KStream}s, before the aggregations are applied to
* the new partitions resulting in a {@link KTable}.
* <p>
* A {@code CogroupedKStream} must be obtained from a {@link KGroupedStream} via 
* {@link KGroupedStream#cogroup(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String) cogroup(...)}.
*
* @param <K> Type of keys
* @param <RK> Type of key in the table, either K or Windowed&ltK&gt
* @param <V> Type of aggregate values
*/
public interface CogroupedKStream<K, V> {

	    <T> CogroupedKStream<K, V> cogroup(final KGroupedStream<K, T> groupedStream,
                                       final Aggregator<? super K, ? super T, V> aggregator);
	
    KTable<K, V> aggregate(final Initializer<V> initializer,
                           final Serde<V> valueSerde,
                           final String storeName);
	    
    KTable<K, V> aggregate(final Initializer<V> initializer,
                           final StateStoreSupplier<KeyValueStore> storeSupplier);

	    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                     final Merger<? super K, V> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final Serde<V> valueSerde,
                                     final String storeName);
	
    KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                     final Merger<? super K, V> sessionMerger,
                                     final SessionWindows sessionWindows,
                                     final StateStoreSupplier<SessionStore> storeSupplier);
	
    <W extends Window> KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                                        final Windows<W> windows,
                                                        final Serde<V> valueSerde,
                                                        final String storeName);
	
    <W extends Window> KTable<Windowed<K>, V> aggregate(final Initializer<V> initializer,
                                                        final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier);
}

Expected use:

Code Block
KTable<K, V> cogroupedTable = groupedStream1.cogroup(initializer, aggregator1, aggValueSerde).cogroup(groupedStream2, aggregator2).cogroup(groupedStream3, aggregator3) ... .cogroup(groupedStreamN, aggregatorN).aggregate("aggValue");
                                                        final StateStoreSupplier<WindowStore> storeSupplier);
}

 

Proposed Changes

  1. Construct the above Public Interfaces.
  2. Create an internal.KCogroupedStreamImpl that will keep track of the KeyValueSerde, AggValueSerde, Initializer, and Pairs of (KGroupedStream, Aggregator).
  3. Model the aggregate method of internal.KCogroupedStream after the doAggregate method of KGroupedStream by forcing the KGroupedStreams to repartitionIfRequired and adding the KStreamAggProcessorSupplier for each KGroupedStream. Additionally ensure all sources are copartitioned, processors have access to the state store, and add a KStreamCogroup processor.
  4. Create a KStreamCogroup that will passthrough all outputs from the KStreamAggregate. KStreamCogroup must also be a KStreamAggProcessorSupplier; it will keep track of all of its parent KStreamAggProcessorSuppliers in case it needs to enableSendingOldValues and it can have one of them create a KTableValueGetterSupplier if view is called.

...