Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleStores
public static <K, V> StateStoreSupplier<KeyValueStore<KBytesStoreSupplier<KeyValueStore<Bytes, V>>byte[]>> persistentKeyValueStore(final String name,) 

public static <K, V> BytesStoreSupplier<KeyValueStore<Bytes, byte[]>> inMemoryKeyValueStore(final String name)

public static <K, V> BytesStoreSupplier<KeyValueStore<Bytes, byte[]>> lruMap(final String name) 

public static <K, V> BytesStoreSupplier<WindowStore<Bytes, byte[]>> persistentWindowStore(final String name, final Windows windows)

public static <K, V> BytesStoreSupplier<SessionStore<Bytes, byte[]>> persistentSessionStore(final String name,
                                                 final Serde<K> keySerde,
                           final SessionWindows windows) 

/**
 *  The following methods are for use with the PAPI. They allow building of StateStores that can be wrapped with
 *  caching, logging, and any other convenient wrappers provided by the KafkaStreams library
 */ 
public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final BytesStoreSupplier<WindowStore<Bytes, byte[]>> supplier, 
																	  final Serde<K> keySerde, 
																       final Serde<V> valueSerde) 

public static <K, V> StateStoreSupplier<KeyValueStore<KStateStoreBuilder<KeyValueStore<K, V>> inMemoryKeyValueStorekeyValueStoreBuilder(final String name,
          BytesStoreSupplier<KeyValueStore<Bytes, byte[]>> supplier,
 																		  final Serde<K> keySerde, 
																          final Serde<V> valueSerde)

public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final BytesStoreSupplier<SessionStore<Bytes, byte[]>> supplier,
																	    final Serde<K> keySerde, 
																        final Serde<V> valueSerde)
Code Block
languagejava
titleTopologyBuilder
public synchronized <K, V> TopologyBuilder addGlobalStore(final StateStoreBuilder storeSupplier,
                           final  Serde<K> keySerde,
                            final String sourceName,
                                                  final Serde<V> valueSerde)

public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> lruMap(final StringTimestampExtractor nametimestampExtractor,
                                                                final intDeserializer capacitykeyDeserializer,
                                                                final Serde<K>Deserializer keySerdevalueDeserializer,
                                                                final Serde<V> valueSerde) 

public static <K, V> StateStoreSupplier<WindowStore<K, V>> persistentWindowStore(final String name,
String topic,
                                                          final String processorName,
                        final Windows windows,
                                final ProcessorSupplier stateUpdateSupplier)
 
public synchronized final TopologyBuilder addStateStore(final StateStoreBuilder supplier, final String... processorNames)
 

 

 

Code Block
languagejava
titleKGroupedStream
<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows);

SessionWindowedKStream<K, V> sessionWindowedBy(SessionWindows sessionWindows);

KTable<K, Long> count(final Materialized<K, Long> materialized);

KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K,        VR> aggregate(final Serde<K>Initializer<VR> keySerdeinitializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
        final Serde<V> valueSerde)

public static <K, V> StateStoreSupplier<SessionStore<K, V>> persistentSessionStore(final String name,
             final Materialized<K, VR, KeyValueStore<K, VR>> materialized);
Code Block
languagejava
titleKGroupedTable
KTable<K, Long> count(final Materialized<K, V, KeyValueStore<K, V>> materialized);

KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super finalV, SessionWindowsVR> windowsaggregator,
                             final Aggregator<? super K, ? super V, VR> subtractor,
                             final Materialized<K, VR, KeyValueStore<K, VR>> materialized);

 

New classes and interfaces:

Code Block
languagejava
titleWindowedKStream
public interface WindowedKStream<K, V> {

   final Serde<K> keySerdeKTable<Windowed<K>,
 Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator);

    <VR> KTable<Windowed<K>, VR> aggregate(final Serde<V> valueSerde) 

/**Initializer<VR> initializer,
 *  The following methods are for use with the PAPI. They allow building of StateStores that can be   wrapped with
 *  caching, logging, and any other convenient wrappers provided by the KafkaStreams library
 */ 
public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final StateStoreSupplier<WindowStore<KAggregator<? super K, V>> supplier)

public <K? super V, V>VR> StateStoreBuilder<KeyValueStore<Kaggregator,
  V>> keyValueStoreBuilder(final StateStoreSupplier<KeyValueStore<K, V>> supplier)

public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final StateStoreSupplier<SessionStore<K, V>> supplier)
Code Block
languagejava
titleTopologyBuilder
public synchronized <K, V> TopologyBuilder addGlobalStore(final StateStoreBuilder storeSupplier,
                      final Materialized<K, VR, WindowStore<K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
               final String sourceName,
                 final Materialized<K, V, WindowStore<K, V>> materializedAs);


} 
Code Block
languagejava
titleSessionWindowedKStream
public interface SessionWindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<K, Long>> materializedAs);

    <VR, T> KTable<Windowed<K>,         VR> aggregate(final TimestampExtractorInitializer<VR> timestampExtractorinitializer,
                                              final Aggregator<? super K, ? super V,      final Deserializer keyDeserializerVR> aggregator,
                                              final Merger<? super K, T> sessionMerger);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final DeserializerInitializer<VR> valueDeserializerinitializer,
                                              final Aggregator<? super K, ? super V, VR> aggregator,
     final String topic,
                                       final Merger<? super K, T> sessionMerger,
              final String processorName,
                              final Materialized<K, VR, SessionStore<K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
       final ProcessorSupplier stateUpdateSupplier)
 
public synchronized final TopologyBuilder addStateStore(final StateStoreBuilder supplier, final String... processorNames)
 

 

...

                           final Materialized<K, V, SessionStore<K, V>> materializedAs);
}
Code Block
languagejava
titleKGroupedStream
<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows);

SessionWindowedKStream<K, V> sessionWindowedBy(SessionWindows sessionWindows);

KTable<K, Long> count(final Materialized<K, Long> materialized);

KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final Materialized<K, VR, KeyValueStore<K, VR>> materialized);
Code Block
languagejava
titleKGroupedTable
KTable<K, Long> count(final Materialized<K, V, KeyValueStore<K, V>> materialized);

KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Aggregator<? super K, ? super V, VR> subtractor,
                             final Materialized<K, VR, KeyValueStore<K, VR>> materialized);

 

New classes and interfaces:

Code Block
languagejava
titleWindowedKStream
public interface WindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator,
                                           final Materialized<K, VR, WindowStore<K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                  final Materialized<K, V, WindowStore<K, V>> materializedAs);


} 
Code Block
languagejava
titleSessionWindowedKStream
public interface SessionWindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<K, Long>> materializedAs);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                              final Aggregator<? super K, ? super V, VR> aggregator,
                                              final Merger<? super K, T> sessionMerger);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                              final Aggregator<? super K, ? super V, VR> aggregator,
                                              final Merger<? super K, T> sessionMerger,
                                              final Materialized<K, VR, SessionStore<K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                  final Materialized<K, V, SessionStore<K, V>> materializedAs);
}
Code Block
languagejava
titleMaterialized
/**
 * Used when materializing a state store
 */
public class Materialized<K, V, S extends StateStore> {
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName)

    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StateStoreSupplier<S> supplier)

    public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde)

    public Materialized<K, V, S> withKeySerde(final Serde<K> valueSerde)

    public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> topicConfig)

    public Materialized<K, V, S> withLoggingDisabled()

    public Materialized<K, V, S> withCachingEnabled()

    public Materialized<K, V, S> withCachingDisabled()

} 
Code Block
languagejava
titleSerialized
/**
 * Optional params that can be passed to groupBy and groupByKey operations
 */
public class Serialized<K, V> {

    public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)

    public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final Serde<V> otherValueSerde)

    public Serialized<K, V> withKeySerde(final Serde<K> keySerde)

    public Serialized<K, V> withValueSerde(final Serde valueSerde)
}
Code Block
languagejava
titleJoined
/**
 * Optional params that can be passed to join, leftJoin, outerJoin operations
 */
public class Joined<K, V, VO> {

    public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde)

    public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> keySerde)

    public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde)

    public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<V> valueSerde)

    public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde)

    public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde)

    public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde)
}
Code Block
languagejava
titlePartitioned
/**
 * Optional arguments that can be specified when doing to and through operations
 */
public static <K, V> Partitioned<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)

public static <K, V> Partitioned<K, V> with(final StreamPartitioner<K, V> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde) 

public static <K, V> Partitioned<K, V> keySerde(final Serde<K> keySerde)

public static <K, V> Partitioned<K, V> valueSerde(final Serde<V> valueSerde) 

public static <K, V> Partitioned<K, V> streamPartitioner(final StreamPartitioner<K, V> partitioner) 

public Partitioned<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner) 
public Partitioned<K, V> withValueSerde(final Serde<V> valueSerde) 
public Partitioned<K, V> withKeySerde(final Serde<K> keySerde)
Code Block
languagejava
titlePrinted
 
/**
 * options that can be used when printing to stdout our writing to a file
 */
public class Printed<K, V> {
    public static <K, V> Printed<K, V> toFile(final String filepath)

    public static <K, V> Printed<K, V> toSysOut()

    public Printed<K, V> withLabel(final String label)

    private Printed<K, V> withFile(final String filepath)

    public Printed<K, V> withKeySerde(final Serde<K> keySerde)

    public Printed<K, V> withValueSerde(final Serde<V> keySerde)

    public Printed<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper)

}
Materialized
/**
 * Used when materializing a state store
 */
public class Materialized<K, V, S extends StateStore> {
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName)

    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StateStoreSupplier<S> supplier)

    public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde)

    public Materialized<K, V, S> withKeySerde(final Serde<K> valueSerde)

    public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> topicConfig)

    public Materialized<K, V, S> withLoggingDisabled()

    public Materialized<K, V, S> withCachingEnabled()

    public Materialized<K, V, S> withCachingDisabled()

} 
Code Block
languagejava
titleSerialized
/**
 * Optional params that can be passed to groupBy and groupByKey operations
 */
public class Serialized<K, V> {

    public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)

    public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final Serde<V> otherValueSerde)

    public Serialized<K, V> withKeySerde(final Serde<K> keySerde)

    public Serialized<K, V> withValueSerde(final Serde valueSerde)
}
Code Block
languagejava
titleJoined
/**
 * Optional params that can be passed to join, leftJoin, outerJoin operations
 */
public class Joined<K, V, VO> {

    public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde)

    public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> keySerde)

    public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde)

    public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<V> valueSerde)

    public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde)

    public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde)

    public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde)
}
Code Block
languagejava
titlePartitioned
/**
 * Optional arguments that can be specified when doing to and through operations
 */
public static <K, V> Partitioned<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)

public static <K, V> Partitioned<K, V> with(final StreamPartitioner<K, V> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde) 

public static <K, V> Partitioned<K, V> keySerde(final Serde<K> keySerde)

public static <K, V> Partitioned<K, V> valueSerde(final Serde<V> valueSerde) 

public static <K, V> Partitioned<K, V> streamPartitioner(final StreamPartitioner<K, V> partitioner) 

public Partitioned<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner) 
public Partitioned<K, V> withValueSerde(final Serde<V> valueSerde) 
public Partitioned<K, V> withKeySerde(final Serde<K> keySerde)
Code Block
languagejava
titlePrinted
 
/**
 * options that can be used when printing to stdout our writing to a file
 */
public class Printed<K, V> {
    public static <K, V> Printed<K, V> toFile(final String filepath)

    public static <K, V> Printed<K, V> toSysOut()

    public Printed<K, V> withLabel(final String label)

    private Printed<K, V> withFile(final String filepath)

    public Printed<K, V> withKeySerde(final Serde<K> keySerde)

    public Printed<K, V> withValueSerde(final Serde<V> keySerde)

    public Printed<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper)

}
Code Block
languagejava
titleStateStoreBuilder
/**
 * Implementations of this will provide the ability to wrap a given StateStore
 * with or without caching/loggging etc.
 */
public interface StateStoreBuilder<T extends StateStore> {

    StateStoreBuilder<T> withCachingEnabled();
    StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config);
    T build();
}
Code Block
languagejava
titleBytesStoreSupplier
public interface BytesStoreSupplier<T extends StateStore> {

    /**
     * Return the name of this state store supplier.
     * This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'
     *
     * @return the name of this state store supplier
     */
    String name();

    /**
     * Return a new {@link StateStore} instance.
     *
     * @return a new {@link StateStore} instance of type T
     */
    T get();
Code Block
/**
 * Implementations of this will provide the ability to wrap a given StateStore
 * with or without caching/loggging etc.
 */
public interface StateStoreBuilder<T extends StateStore> {

    StateStoreBuilder<T> withCachingEnabled();
    StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config);
    T build();
}

 

Proposed Changes

Add the above methods, interfaces, classes to the DSL. Deprecate the existing overloads.

...