THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
public static <K, V> StateStoreSupplier<KeyValueStore<KBytesStoreSupplier<KeyValueStore<Bytes, V>>byte[]>> persistentKeyValueStore(final String name,) public static <K, V> BytesStoreSupplier<KeyValueStore<Bytes, byte[]>> inMemoryKeyValueStore(final String name) public static <K, V> BytesStoreSupplier<KeyValueStore<Bytes, byte[]>> lruMap(final String name) public static <K, V> BytesStoreSupplier<WindowStore<Bytes, byte[]>> persistentWindowStore(final String name, final Windows windows) public static <K, V> BytesStoreSupplier<SessionStore<Bytes, byte[]>> persistentSessionStore(final String name, final Serde<K> keySerde, final SessionWindows windows) /** * The following methods are for use with the PAPI. They allow building of StateStores that can be wrapped with * caching, logging, and any other convenient wrappers provided by the KafkaStreams library */ public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final BytesStoreSupplier<WindowStore<Bytes, byte[]>> supplier, final Serde<K> keySerde, final Serde<V> valueSerde) public static <K, V> StateStoreSupplier<KeyValueStore<KStateStoreBuilder<KeyValueStore<K, V>> inMemoryKeyValueStorekeyValueStoreBuilder(final String name, BytesStoreSupplier<KeyValueStore<Bytes, byte[]>> supplier, final Serde<K> keySerde, final Serde<V> valueSerde) public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final BytesStoreSupplier<SessionStore<Bytes, byte[]>> supplier, final Serde<K> keySerde, final Serde<V> valueSerde) |
Code Block | ||||
---|---|---|---|---|
| ||||
public synchronized <K, V> TopologyBuilder addGlobalStore(final StateStoreBuilder storeSupplier, final Serde<K> keySerde, final String sourceName, final Serde<V> valueSerde) public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> lruMap(final StringTimestampExtractor nametimestampExtractor, final intDeserializer capacitykeyDeserializer, final Serde<K>Deserializer keySerdevalueDeserializer, final Serde<V> valueSerde) public static <K, V> StateStoreSupplier<WindowStore<K, V>> persistentWindowStore(final String name, String topic, final String processorName, final Windows windows, final ProcessorSupplier stateUpdateSupplier) public synchronized final TopologyBuilder addStateStore(final StateStoreBuilder supplier, final String... processorNames) |
Code Block | ||||
---|---|---|---|---|
| ||||
<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows); SessionWindowedKStream<K, V> sessionWindowedBy(SessionWindows sessionWindows); KTable<K, Long> count(final Materialized<K, Long> materialized); KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized); <VR> KTable<K, VR> aggregate(final Serde<K>Initializer<VR> keySerdeinitializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final Serde<V> valueSerde) public static <K, V> StateStoreSupplier<SessionStore<K, V>> persistentSessionStore(final String name, final Materialized<K, VR, KeyValueStore<K, VR>> materialized); |
Code Block | ||||
---|---|---|---|---|
| ||||
KTable<K, Long> count(final Materialized<K, V, KeyValueStore<K, V>> materialized); KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Materialized<K, V, KeyValueStore<K, V>> materialized); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super finalV, SessionWindowsVR> windowsaggregator, final Aggregator<? super K, ? super V, VR> subtractor, final Materialized<K, VR, KeyValueStore<K, VR>> materialized); |
New classes and interfaces:
Code Block | ||||
---|---|---|---|---|
| ||||
public interface WindowedKStream<K, V> { final Serde<K> keySerdeKTable<Windowed<K>, Long> count(); KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs); <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator); <VR> KTable<Windowed<K>, VR> aggregate(final Serde<V> valueSerde) /**Initializer<VR> initializer, * The following methods are for use with the PAPI. They allow building of StateStores that can be wrapped with * caching, logging, and any other convenient wrappers provided by the KafkaStreams library */ public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final StateStoreSupplier<WindowStore<KAggregator<? super K, V>> supplier) public <K? super V, V>VR> StateStoreBuilder<KeyValueStore<Kaggregator, V>> keyValueStoreBuilder(final StateStoreSupplier<KeyValueStore<K, V>> supplier) public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final StateStoreSupplier<SessionStore<K, V>> supplier) | ||||
Code Block | ||||
| ||||
public synchronized <K, V> TopologyBuilder addGlobalStore(final StateStoreBuilder storeSupplier, final Materialized<K, VR, WindowStore<K, VR>> materializedAs); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final String sourceName, final Materialized<K, V, WindowStore<K, V>> materializedAs); } |
Code Block | ||||
---|---|---|---|---|
| ||||
public interface SessionWindowedKStream<K, V> { KTable<Windowed<K>, Long> count(); KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<K, Long>> materializedAs); <VR, T> KTable<Windowed<K>, VR> aggregate(final TimestampExtractorInitializer<VR> timestampExtractorinitializer, final Aggregator<? super K, ? super V, final Deserializer keyDeserializerVR> aggregator, final Merger<? super K, T> sessionMerger); <VR, T> KTable<Windowed<K>, VR> aggregate(final DeserializerInitializer<VR> valueDeserializerinitializer, final Aggregator<? super K, ? super V, VR> aggregator, final String topic, final Merger<? super K, T> sessionMerger, final String processorName, final Materialized<K, VR, SessionStore<K, VR>> materializedAs); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final ProcessorSupplier stateUpdateSupplier) public synchronized final TopologyBuilder addStateStore(final StateStoreBuilder supplier, final String... processorNames) |
...
final Materialized<K, V, SessionStore<K, V>> materializedAs);
} |
Code Block | ||||
---|---|---|---|---|
| ||||
<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows);
SessionWindowedKStream<K, V> sessionWindowedBy(SessionWindows sessionWindows);
KTable<K, Long> count(final Materialized<K, Long> materialized);
KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Serde<VR> aggValueSerde,
final Materialized<K, VR, KeyValueStore<K, VR>> materialized); |
Code Block | ||||
---|---|---|---|---|
| ||||
KTable<K, Long> count(final Materialized<K, V, KeyValueStore<K, V>> materialized);
KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Materialized<K, V, KeyValueStore<K, V>> materialized);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Aggregator<? super K, ? super V, VR> subtractor,
final Materialized<K, VR, KeyValueStore<K, VR>> materialized); |
New classes and interfaces:
Code Block | ||||
---|---|---|---|---|
| ||||
public interface WindowedKStream<K, V> {
KTable<Windowed<K>, Long> count();
KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs);
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator);
<VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, WindowStore<K, VR>> materializedAs);
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Materialized<K, V, WindowStore<K, V>> materializedAs);
} |
Code Block | ||||
---|---|---|---|---|
| ||||
public interface SessionWindowedKStream<K, V> {
KTable<Windowed<K>, Long> count();
KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<K, Long>> materializedAs);
<VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Merger<? super K, T> sessionMerger);
<VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Merger<? super K, T> sessionMerger,
final Materialized<K, VR, SessionStore<K, VR>> materializedAs);
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);
KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
final Materialized<K, V, SessionStore<K, V>> materializedAs);
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Used when materializing a state store
*/
public class Materialized<K, V, S extends StateStore> {
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName)
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StateStoreSupplier<S> supplier)
public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde)
public Materialized<K, V, S> withKeySerde(final Serde<K> valueSerde)
public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> topicConfig)
public Materialized<K, V, S> withLoggingDisabled()
public Materialized<K, V, S> withCachingEnabled()
public Materialized<K, V, S> withCachingDisabled()
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Optional params that can be passed to groupBy and groupByKey operations
*/
public class Serialized<K, V> {
public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)
public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final Serde<V> otherValueSerde)
public Serialized<K, V> withKeySerde(final Serde<K> keySerde)
public Serialized<K, V> withValueSerde(final Serde valueSerde)
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Optional params that can be passed to join, leftJoin, outerJoin operations
*/
public class Joined<K, V, VO> {
public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde)
public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> keySerde)
public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde)
public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<V> valueSerde)
public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde)
public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde)
public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde)
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Optional arguments that can be specified when doing to and through operations
*/
public static <K, V> Partitioned<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)
public static <K, V> Partitioned<K, V> with(final StreamPartitioner<K, V> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde)
public static <K, V> Partitioned<K, V> keySerde(final Serde<K> keySerde)
public static <K, V> Partitioned<K, V> valueSerde(final Serde<V> valueSerde)
public static <K, V> Partitioned<K, V> streamPartitioner(final StreamPartitioner<K, V> partitioner)
public Partitioned<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner)
public Partitioned<K, V> withValueSerde(final Serde<V> valueSerde)
public Partitioned<K, V> withKeySerde(final Serde<K> keySerde) |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* options that can be used when printing to stdout our writing to a file
*/
public class Printed<K, V> {
public static <K, V> Printed<K, V> toFile(final String filepath)
public static <K, V> Printed<K, V> toSysOut()
public Printed<K, V> withLabel(final String label)
private Printed<K, V> withFile(final String filepath)
public Printed<K, V> withKeySerde(final Serde<K> keySerde)
public Printed<K, V> withValueSerde(final Serde<V> keySerde)
public Printed<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper)
}
|
| |
/**
* Used when materializing a state store
*/
public class Materialized<K, V, S extends StateStore> {
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName)
public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StateStoreSupplier<S> supplier)
public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde)
public Materialized<K, V, S> withKeySerde(final Serde<K> valueSerde)
public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> topicConfig)
public Materialized<K, V, S> withLoggingDisabled()
public Materialized<K, V, S> withCachingEnabled()
public Materialized<K, V, S> withCachingDisabled()
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Optional params that can be passed to groupBy and groupByKey operations
*/
public class Serialized<K, V> {
public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)
public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final Serde<V> otherValueSerde)
public Serialized<K, V> withKeySerde(final Serde<K> keySerde)
public Serialized<K, V> withValueSerde(final Serde valueSerde)
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Optional params that can be passed to join, leftJoin, outerJoin operations
*/
public class Joined<K, V, VO> {
public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde)
public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> keySerde)
public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde)
public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<V> valueSerde)
public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde)
public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde)
public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde)
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Optional arguments that can be specified when doing to and through operations
*/
public static <K, V> Partitioned<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)
public static <K, V> Partitioned<K, V> with(final StreamPartitioner<K, V> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde)
public static <K, V> Partitioned<K, V> keySerde(final Serde<K> keySerde)
public static <K, V> Partitioned<K, V> valueSerde(final Serde<V> valueSerde)
public static <K, V> Partitioned<K, V> streamPartitioner(final StreamPartitioner<K, V> partitioner)
public Partitioned<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner)
public Partitioned<K, V> withValueSerde(final Serde<V> valueSerde)
public Partitioned<K, V> withKeySerde(final Serde<K> keySerde) |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* options that can be used when printing to stdout our writing to a file
*/
public class Printed<K, V> {
public static <K, V> Printed<K, V> toFile(final String filepath)
public static <K, V> Printed<K, V> toSysOut()
public Printed<K, V> withLabel(final String label)
private Printed<K, V> withFile(final String filepath)
public Printed<K, V> withKeySerde(final Serde<K> keySerde)
public Printed<K, V> withValueSerde(final Serde<V> keySerde)
public Printed<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper)
}
|
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Implementations of this will provide the ability to wrap a given StateStore
* with or without caching/loggging etc.
*/
public interface StateStoreBuilder<T extends StateStore> {
StateStoreBuilder<T> withCachingEnabled();
StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config);
T build();
} |
Code Block | ||||
---|---|---|---|---|
| ||||
public interface BytesStoreSupplier<T extends StateStore> {
/**
* Return the name of this state store supplier.
* This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-'
*
* @return the name of this state store supplier
*/
String name();
/**
* Return a new {@link StateStore} instance.
*
* @return a new {@link StateStore} instance of type T
*/
T get();
| ||||
Code Block | ||||
/**
* Implementations of this will provide the ability to wrap a given StateStore
* with or without caching/loggging etc.
*/
public interface StateStoreBuilder<T extends StateStore> {
StateStoreBuilder<T> withCachingEnabled();
StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config);
T build();
} |
Proposed Changes
Add the above methods, interfaces, classes to the DSL. Deprecate the existing overloads.
...