Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKStream
void print(final PrintOptions<KPrinted<K, V> printOptionsprinted);

KStream<K, V> through(final String topic, final TopicOptions<KPartitioned<K, V> topicOptionspartitioned);

void to(final String topic, final TopicOptions<VPartitioned<V, V> topicOptionspartitioned);

KGroupedStream<K, V> groupByKey(final GroupByOptions<KSerialized<K, V> groupByOptionsserialized);

<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector, GroupByOptions<KRSerialized<KR, V> groupByOptionsserialized);

<VO, VR> KStream<K, VR> join(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<KJoined<K, V, VO> options);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinOptions<KJoined<K, V, VT> options);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<KJoined<K, V, VO> options);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<KJoined<K, V, VT> options);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<KJoined<K, V, VO> options);

<VT, VR> KStream<K, VR> outerJoin(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final JoinWindows windows, final JoinOptions<KJoined<K, V, VT> options);
Code Block
languagejava
titleKTable
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, GroupByOptions<KRSerialized<KR, VR> groupByOptionsserialized);

KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized);

KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Materialized<K, V, KeyValueStore<K, V>> materialized);

void to(final String topic, final TopicOptions<VPartitioned<V, V> options);

KTable<K, V> through(final String topic, final Materialized<K, V> options);

<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final Materialized<K, VVR, KeyValueStore<K, VR>> materialized);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final Materialized<K, VVR, KeyValueStore<K, VR>> materialized);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final Materialized<K, VVR, KeyValueStore<K, VR>> materialized);

...

We add some new helper methods to Stores so people can conveniently and quickly create basic StateStoreSuppliers for use in the DSL or PAPI. We will also deprecate the existing Stores.create(...)

Code Block
languagejava
titleStores
public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> persistentKeyValueStore(final String name,
                                                                                     final Serde<K> keySerde,
                                                                                     final Serde<V> valueSerde) 

public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> inMemoryKeyValueStore(final String name,
                                                                                final Serde<K> keySerde,
                                                                                final Serde<V> valueSerde)

public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> lruMap(final String name,
                                                                final int capacity,
                                                                final Serde<K> keySerde,
                                                                final Serde<V> valueSerde) 

public static <K, V> StateStoreSupplier<WindowStore<K, V>> persistentWindowStore(final String name,
                                                                            final Windows windows,
                                                                            final Serde<K> keySerde,
                                                                            final Serde<V> valueSerde)

public static <K, V> StateStoreSupplier<SessionStore<K, V>> persistentSessionStore(final String name,
                                                                              final SessionWindows windows,
                                                                              final Serde<K> keySerde,
                                                                              final Serde<V> valueSerde) 

/**
 *  The following methods are for use with the PAPI. They allow building of StateStores that can be wrapped with
 *  caching, logging, and any other convenient wrappers provided by the KafkaStreams library
 */ 
public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final StateStoreSupplier<WindowStore<K, V>> supplier)

public <K, V> StateStoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final StateStoreSupplier<KeyValueStore<K, V>> supplier)

public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final StateStoreSupplier<SessionStore<K, V>> supplier)

...

Code Block
languagejava
titleKGroupedStream
<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows);

SessionWindowedKStream<K, V> sessionWindowedBy(SessionWindows sessionWindows);

KTable<K, Long> count(final Materialized materialized);

KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final Materialized<K, VVR, KeyValueStore<K, V>>VR>> materialized);
Code Block
languagejava
titleKGroupedTable
KTable<K, Long> count(final Materialized<K, V, KeyValueStore<K, V>> materialized);

KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Aggregator<? super K, ? super V, VR> subtractor,
                             final Materialized<K, VVR, KeyValueStore<K, V>>VR>> materialized);

 

New classes and interfaces:

...

Code Block
languagejava
titleGroupByOptionsSerialized
/**
 * Optional params that can be passed to groupBy and groupByKey operations
 */
public class GroupByOptions<KSerialized<K, V> {

    public static <K, V> GroupByOptions<KSerialized<K, V> serdeswith(final Serde<K> keySerde, final Serde<V> valueSerde) {

    public static <K, V> returnSerialized<K, newV> GroupByOptions<>with();
final Serde<K> keySerde, final }
Serde<V> valueSerde, final Serde<V> otherValueSerde)

    public GroupByOptions<KSerialized<K, V> withKeySerde(final Serde<K> keySerde)

    public GroupByOptions<KSerialized<K, V> withValueSerde(final Serde valueSerde)
}
Code Block
languagejava
titleJoinOptionsJoined
/**
 * Optional params that can be passed to join, leftJoin, outerJoin operations
 */
public class JoinOptions<KJoined<K, V, VO> {

    public static <K, V, VO> JoinOptions<KJoined<K, V, VO> serdeswith(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde)

    public JoinOptions<K static <K, V, VO> Joined<K, V, VO> withKeySerdekeySerde(final Serde<K> keySerde)

    public JoinOptions<Kstatic <K, V, VO> withValueSerde(final Serde<V> valueSerde) Joined<K, V, VO> valueSerde(final Serde<V> valueSerde)

    public JoinOptions<K static <K, V, VO> Joined<K, V, VO> withOtherValueSerdeotherValueSerde(final Serde<VO>Serde<V> otherValueSerdevalueSerde)
}
Code Block
languagejava
titleTopicOptions
/**
 * Optional arguments thatpublic canJoined<K, beV, specifiedVO> whenwithKeySerde(final doing to and through operations
 */
public class TopicOptions<K, V> {Serde<K> keySerde)

    public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde)

    public static <KJoined<K, V> TopicOptions<KV, V>VO> serdeswithOtherValueSerde(final Serde<VO> otherValueSerde)
}
Code Block
languagejava
titlePartitioned
/**
 * Optional arguments that can be specified when doing to and through operations
 */
public static <K, V> Partitioned<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)

Serde<K> keySerde, final Serde<V> valueSerde) {
        return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde);
    }

    public static <K, V> TopicOptions<KPartitioned<K, V> optionswith(final StreamPartitioner<K, V> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde) {

public static <K, V> Partitioned<K, V> keySerde(final Serde<K> keySerde)

public static return<K, newV> TopicOptions<KPartitioned<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde).withStreamPartitioner(partitioner);
    }

    public TopicOptions<K valueSerde(final Serde<V> valueSerde) 

public static <K, V> Partitioned<K, V> withStreamPartitionerstreamPartitioner(final StreamPartitioner<K, V> partitioner) {

public Partitioned<K, V> withStreamPartitioner(final StreamPartitioner<K, V>   return null;
    }

    public TopicOptions<Kpartitioner) 
public Partitioned<K, V> withValueSerde(final Serde<V> valueSerde) {
public Partitioned<K, V> withKeySerde(final     return null;
    }

    public TopicOptions<K, V> withKeySerde(final Serde<K> keySerde) {
        return null;
    }
}Serde<K> keySerde)
code
Code Block
languagejava
titlePrinted
 
/**
 * options that can be used when printing to stdout our writing to a file
 */
public class PrintOptions<K, V> {
    public static <K, V> PrintOptions<K, V> labeled(final String label) 
    
    public static <K, V> PrintOptions<K, V> toFile(final String filepath)

    public static <K, V> PrintOptions<K, V> sysOut(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label) 

 class Printed<K, V> {
    public static <K, V> PrintOptions<KPrinted<K, V> toFile(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String labelfilepath)

    public static <K, V> Printed<K, V> toSysOut()

    public PrintOptions<KPrinted<K, V> withLabel(final String label) 

    publicprivate PrintOptions<KPrinted<K, V> withFile(final String filepath)

    public PrintOptions<KPrinted<K, V> withKeySerde(final Serde<K> keySerde)

    public PrintOptions<KPrinted<K, V> withValueSerde(final Serde<V> keySerde)

    public PrintOptions<KPrinted<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper)

}

 

...


Code Block
/**
 * Implementations of this will provide the ability to wrap a given StateStore
 * with or without caching/loggging etc.
 */
public interface StateStoreBuilder<T extends StateStore> {

    StateStoreBuilder<T> withCachingEnabled();
    StateStoreBuilder<T> withCachingDisabled();
    StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config);
    StateStoreBuilder<T> withLoggingDisabled();
    T build();
}

...