THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
KTable<K, Long> count(final Materialized<K, V, KeyValueStore<K, V>> materialized); KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Materialized<K, V, KeyValueStore<K, V>> materialized); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Aggregator<? super K, ? super V, VR> subtractor, final Materialized<K, VR, KeyValueStore<K, VR>> materialized); |
...
For StreamsBuilder we remove all stream, table, and globalTable overloads that take more than a single argument and replace them with:
Code Block | |
---|---|
java | |
title | WindowedKStream | public interfacesynchronized WindowedKStream<K<K, V> { KStream<K, V> stream(final Consumed<K, KTable<Windowed<K>V> options, final String... topics) public synchronized <K, V> KStream<K, V> stream(final Pattern pattern, final Consumed<K, V> options) public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed) public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V> materialized) public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed) public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V> materialized) |
New classes and interfaces:
Code Block | ||||
---|---|---|---|---|
| ||||
public interface WindowedKStream<K, V> { KTable<Windowed<K>, Long> count(); KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs); <VR> KTable<Windowed<K>, VR> Long> count(); KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs); <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator); <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator,); <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Materialized<K, VR, WindowStore<K, VR>> materializedAs); KTable<Windowed<K>final Aggregator<? super K, V> reduce(final Reducer<V> reducer); ? super V, VR> aggregator, KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, VVR, WindowStore<K, V>>VR>> materializedAs); } | ||||
Code Block | ||||
| ||||
public interface SessionWindowedKStream<K, V> { KTable<Windowed<K>, Long>V> countreduce(final Reducer<V> reducer); KTable<Windowed<K>, Long>V> countreduce(final Materialized<K,Reducer<V> Long, SessionStore<K, Long>> materializedAs); reducer, <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Materialized<K, V, WindowStore<K, V>> materializedAs); } |
Code Block | ||||
---|---|---|---|---|
| ||||
public interface SessionWindowedKStream<K, V> { KTable<Windowed<K>, Long> count(); KTable<Windowed<K>, Long> count(final Aggregator<? super K, ? super VMaterialized<K, Long, SessionStore<K, Long>> materializedAs); <VR, T> KTable<Windowed<K>, VR> aggregatoraggregate(final Initializer<VR> initializer, final Merger<Aggregator<? super K, T> sessionMerger); <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer? super V, VR> aggregator, final Aggregator<Merger<? super K, ? super V, VR> aggregatorT> sessionMerger); <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Merger<Aggregator<? super K, ? super V, T>VR> sessionMergeraggregator, final Materialized<K, VR, SessionStore<KMerger<? super K, VR>> materializedAs); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer); T> sessionMerger, KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, VVR, SessionStore<K, V>>VR>> materializedAs); } | ||||
Code Block | ||||
| ||||
/** * Used whenKTable<Windowed<K>, materializingV> areduce(final state storeReducer<V> reducer); */ public class Materialized<K, VKTable<Windowed<K>, SV> extendsreduce(final StateStore>Reducer<V> {reducer, public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName) public static <K, V, S extends StateStore>final Materialized<K, V, SessionStore<K, V>> materializedAs); } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Used when materializing a state store */ public classS> as(final BytesStoreSupplier<S> supplier) public Materialized<K, V, S>S withValueSerde(finalextends Serde<V>StateStore> valueSerde){ public static <K, V, S extends StateStore> Materialized<K, V, S> withKeySerdeas(final Serde<K>String valueSerdestoreName) public static <K, V, S extends StateStore> Materialized<K, V, S> withLoggingEnabledas(final Map<String, String> topicConfigBytesStoreSupplier<S> supplier) public Materialized<K, V, S> withLoggingDisabledwithValueSerde(final Serde<V> valueSerde) public Materialized<K, V, S> withCachingEnabledwithKeySerde(final Serde<K> valueSerde) public Materialized<K, V, S> withCachingDisabled() } withLoggingEnabled(final Map<String, String> topicConfig) public Materialized<K, V, S> withLoggingDisabled() public Materialized<K, V, S> withCachingEnabled() public Materialized<K, V, S> withCachingDisabled() } |
Code Block | ||||
---|---|---|---|---|
Code Block | ||||
| ||||
/** * Optional params that can be passed to groupBy and groupByKey operations */ public class Serialized<K, V> { public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde) public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde) public Serialized<K, V> withKeySerde(final Serde<K> keySerde) public Serialized<K, V> withValueSerde(final Serde valueSerde) } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * options that can be used when printing to stdout our writing to a file */ public class Printed<K, V> { Printed<K, V> { public static <K, V> Printed<K, V> toFile(final String filepath) public static <K, V> Printed<K, V> toSysOut() public Printed<K, V> withLabel(final String label) public Printed<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper) } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Options for consuming a topic as a KStream or KTable */ public class Consumed<K, V> { public static <K, V> Consumed<K, V> deserializedWith(final Serde<K> keySerde, final Serde<V> valueSerde) public static <K, V> Consumed<K, V> timestampedWith(final TimestampExtractor extractor) public static <K, V> Printed<K Consumed<K, V> toFileoffsetResetWith(final StringTopology.AutoOffsetReset filepathresetPolicy) public static <KConsumed<K, V> Printed<K, V> toSysOut(withKeySerde(final Serde<K> keySerde) public Printed<KConsumed<K, V> withLabelwithValueSerde(final StringSerde<V> labelvalueSerde) public Printed<KConsumed<K, V> withKeyValueMapperwithTimestampExtractor(final KeyValueMapper<? super K, ? super V, String> mapper) } TimestampExtractor timestampExtractor) public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Implementations of this will provide the ability to wrap a given StateStore * with or without caching/loggging etc. */ public interface StateStoreBuilder<T extends StateStore> { StateStoreBuilder<T> withCachingEnabled(); StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config); T build(); } |
...