Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Before we go and add many more overloaded methods it is worth while exploring other options to see if we can provide a more concise and intuitive API.

Public Interfaces

New methods added to existing interfaces:

Code Block
languagejava
titleKStream
void print(final PrintOptions<K, V> printOptions);

KStream<K, V> through(final String topic, final TopicOptions<K, V> topicOptions);

void to(final String topic, final TopicOptions<V, V> topicOptions);

KGroupedStream<K, V> groupByKey(final GroupByOptions<K, V> groupByOptions);

<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector, GroupByOptions<KR, V> groupByOptions);

<VO, VR> KStream<K, VR> join(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<K, V, VO> options);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinOptions<K, V, VT> options);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<K, V, VO> options);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<K, V, VT> options);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<K, V, VO> options);

<VT, VR> KStream<K, VR> outerJoin(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<K, V, VT> options);

...

Code Block
languagejava
titleKGroupedStream
<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows);

SessionWindowedKStream<K, V> sessionWindowedBy(SessionWindows sessionWindows);

KTable<K, Long> count(final Materialized materialized);

KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final Materialized<K, V, KeyValueStore<K, V>> materialized);

New classes and interfaces:

Code Block
languagejava
titleWindowedKStream
public interface WindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator,
                                           final Materialized<K, VR, WindowStore<K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                  final Materialized<K, V, WindowStore<K, V>> materializedAs);


} 

...