You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread: TBD

JIRA: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

When multiple streams aggregate together to form a single larger object (eg. A shopping website may have a cart stream, a wish list stream, and a purchases stream. Together they make up a Customer.), it is very difficult to accommodate this in the Kafka-Streams DSL. It generally requires you to group and aggregate all of the streams to KTables then make multiple outerjoin calls to end up with a KTable with your desired object. This will create a state store for each stream and a long chain of ValueJoiners that each new record must go through to get to the final object.
Creating a cogroup method where you use a single state store will:

  1.  Reduce the number of gets from state stores. With the multiple joins when a new value comes into any of the streams a chain reaction happens where ValueGetters keep calling ValueGetters until we have accessed all state stores.
  2. Slight performance increase. As described above all ValueGetters are called also causing all ValueJoiners to be called forcing a recalculation of the current joined value of all other streams, impacting performance.

 

Public Interfaces

KGroupedStream { //Copy of aggregate method signatures.
...
<T> KCogroupedStream<K, K, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Serde<T> aggValueSerde, final String storeName);

<T> KCogroupedStream<K, K, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final StateStoreSupplier<KeyValueStore> storeSupplier);

<T> KCogroupedStream<K, Windowed<K>, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final Serde<T> aggValueSerde, final String storeName);

<T> KCogroupedStream<K, Windowed<K>, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Merger<? super K, T> sessionMerger, final SessionWindows sessionWindows, final StateStoreSupplier<SessionStore> storeSupplier);

<W extends Window, T> KCogroupedStream<K, Windowed<K>, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Windows<W> windows, final Serde<T> aggValueSerde, final String storeName);

<W extends Window, T> KCogroupedStream<K, Windowed<K>, T> cogroup(final Initializer<T> initializer, final Aggregator<? super K, ? super V, T> aggregator, final Windows<W> windows, final StateStoreSupplier<WindowStore> storeSupplier);
}

 

/**
* {@code KCogroupedStream} is an abstraction of multiple <i>grouped</i> record streams of {@link KeyValue} pairs.
* It is an intermediate representation of one or more {@link KStream}s in order to apply one or more aggregation
* operations on the original {@link KStream} records.
* <p>
* It is an intermediate representation after a grouping of {@link KStream}s, before the aggregations are applied to
* the new partitions resulting in a {@link KTable}.
* <p>
* A {@code KCogroupedStream} must be obtained from a {@link KGroupedStream} via
* {@link KGroupedStream#cogroup(Initializer, Aggregator, org.apache.kafka.common.serialization.Serde, String) cogroup(...)}.
*
* @param <K> Type of keys
* @param <RK> Type of key in the table, either K or Windowed&ltK&gt
* @param <V> Type of aggregate values
*/
public interface KCogroupedStream<K, RK, V> {

/**
* @return this KCogroupedStream so you can chain calls
*/
<T> KCogroupedStream<K, RK, V> cogroup(KGroupedStream<K, T> groupedStream, Aggregator<? super K, ? super T, V> aggregator);

KTable<RK, V> aggregate();
}

Expected use:
KTable<K, V> cogroupedTable = groupedStream1.cogroup(initializer, aggregator1, aggValueSerde, "aggValue").cogroup(groupedStream2, aggregator2).cogroup(groupedStream3, aggregator3) ... .cogroup(groupedStreamN, aggregatorN).aggregate();

Proposed Changes

  1. Construct the above Public Interfaces.
  2. Create an internal.KCogroupedStreamImpl that will keep track of the StateStoreSupplier, Initializer, Pairs of (KGroupedStream, Aggregator), and if needed Windows or SessionMerger and SessionWindows.
  3. Model the aggregate method of internal.KCogroupedStream after the doAggregate method of KGroupedStream by forcing the KGroupedStreams to repartitionIfRequired and adding the KStreamAggProcessorSupplier for each KGroupedStream. Additionally ensure all sources are copartitioned, processors have access to the state store, and add a KStreamCogroup processor.
  4. Create a KStreamCogroup that will passthrough all outputs from the KStreamAggregate. KStreamCogroup must also be a KStreamAggProcessorSupplier; it will keep track of all of its parent KStreamAggProcessorSuppliers in case it needs to enableSendingOldValues and it can have one of them create a KTableValueGetterSupplier if view is called.

Compatibility, Deprecation, and Migration Plan

  • Users must upgrade to new version if they want to use this functionality.

Rejected Alternatives

TBD

  • No labels