...
Code Block | ||||
---|---|---|---|---|
| ||||
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, Serialized<KR, VR> serialized); KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<KKeyValueStore<Bytes, V>>byte[]>> materialized); KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<KKeyValueStore<Bytes[], V>>byte[]>> materialized); <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Materialized<K, V, KeyValueStore<KKeyValueStore<Bytes[], V>>byte[]>> materialized); void to(final String topic, final Produced<V, V> options); KTable<K, V> through(final String topic, final Materialized<K, V>V, KeyValueStore<Bytes, byte[]> options); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<KKeyValueStore<Bytes, VR>>byte[]>> materialized); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<KKeyValueStore<Bytes, VR>>byte[]>> materialized); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<KKeyValueStore<Bytes, VR>>byte[]>> materialized); |
We add some new helper methods to Stores so people can conveniently and quickly create basic StateStoreSuppliers for use in the DSL or PAPI. We will also deprecate the existing Stores.create(...)
Code Block | ||||
---|---|---|---|---|
| ||||
public static <K, V> BytesStoreSupplier<KeyValueStore<Bytes, byte[]>> KeyValueBytesStoreSupplier persistentKeyValueStore(final String name) public static <K, V> BytesStoreSupplier<KeyValueStore<Bytes, byte[]>>KeyValueBytesStoreSupplier inMemoryKeyValueStore(final String name) public static <K, V> BytesStoreSupplier<KeyValueStore<Bytes, byte[]>>KeyValueBytesStoreSupplier lruMap(final String name) public static <K, V> BytesStoreSupplier<WindowStore<Bytes, byte[]>>WindowBytesStoreSupplier persistentWindowStore(final String name, final Windows windows) public static <K, V> BytesStoreSupplier<SessionStore<Bytes, byte[]>>long retentionPeriod, final int numSegments, final long windowSize, final boolean retainDuplicates) public static <K, V> SessionBytesStoreSupplier persistentSessionStore(final String name, final long retentionPeriod) /** * The following methods are for use with the PAPI. They allow building of StateStores that can be wrapped with * caching, logging, and any other convenient wrappers provided by the KafkaStreams library */ public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final WindowBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde) public <K, V> StateStoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier, final Serde<K> keySerde, final SessionWindowsSerde<V> windowsvalueSerde) /** * The following methods are for use with the PAPI. They allow building of StateStores that can be wrapped with * caching, logging, and any other convenient wrappers provided by the KafkaStreams library */ public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final BytesStoreSupplier<WindowStore<Bytes, byte[]>> supplier, final Serde<K> keySerde, public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final SessionBytesStoreSupplier supplier, final Serde<K> keySerde, final Serde<V> valueSerde) |
Code Block | ||||
---|---|---|---|---|
| ||||
public synchronized <K, V> Topology addGlobalStore(final StateStoreBuilder storeSupplier, final Serde<V> valueSerde, final String metricsScope) public <K, V> StateStoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final BytesStoreSupplier<KeyValueStore<Bytes, byte[]>> supplier, final Serde<K> keySerde, final Serde<V>String valueSerdesourceName, final String metricsScope) public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final BytesStoreSupplier<SessionStore<Bytes, byte[]>> supplier, final Serde<K> keySerde, final Serde<V> valueSerde, final String metricsScope) | ||||
Code Block | ||||
| ||||
final TimestampExtractor timestampExtractor,
final Deserializer keyDeserializer,
final Deserializer valueDeserializer,
final String topic,
final String processorName,
final ProcessorSupplier stateUpdateSupplier)
public synchronized final Topology addStateStore(final StateStoreBuilder supplier, final String... processorNames)
|
Code Block | ||||
---|---|---|---|---|
| ||||
<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows); SessionWindowedKStream<K, V> windowedBy(SessionWindows sessionWindows); KTable<K, Long> count(final Materialized<K, Long> materialized); KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, public synchronized <K, V> Topology addGlobalStore(final StateStoreBuilder storeSupplier, final Aggregator<? super K, ? super V, VR> aggregator, final StringSerde<VR> sourceNameaggValueSerde, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); |
Code Block | ||||
---|---|---|---|---|
| ||||
KTable<K, Long> count(final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final TimestampExtractor timestampExtractor, final Aggregator<? super K, ? super V, VR> aggregator, final Aggregator<? super K, ? super finalV, DeserializerVR> keyDeserializersubtractor, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); |
For StreamsBuilder we remove all stream, table, and globalTable overloads that take more than a single argument and replace them with:
Code Block | ||||
---|---|---|---|---|
| ||||
public synchronized <K, V> KStream<K, V> stream(final String topic) public synchronized <K, V> KStream<K, V> stream(final String topic, final Consumed<K, V> options) public synchronized <K, V> KStream<K, V> stream(final Collection<String> topic, final Consumed<K, V> options) public synchronized <K, V> KStream<K, V> stream(final Pattern pattern, final Consumed<K, V> options) public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed) public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed) public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) |
New classes and interfaces:
Code Block | ||||
---|---|---|---|---|
| ||||
public interface WindowedKStream<K, V> { KTable<Windowed<K>, Long> count(); KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materializedAs); <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Deserializer valueDeserializer, final String topic, final String processorName, final Aggregator<? super K, ? super V, VR> aggregator); <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final ProcessorSupplier stateUpdateSupplier) public synchronized final Topology addStateStore(final StateStoreBuilder supplier, final String... processorNames) |
Code Block | ||||
---|---|---|---|---|
| ||||
<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows); SessionWindowedKStream<K, V> windowedBy(SessionWindows sessionWindows); KTable<K, Long> count(final Materialized<K, Long> materialized); KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer final Aggregator<? super K, ? super V, VR> aggregator, final Aggregator<? super K, ? super V, VR> aggregator, final Materialized<K, VR, WindowStore<Bytes, byte[]>> materializedAs); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Serde<VR> aggValueSerde, final Materialized<K, VRV, KeyValueStore<KWindowStore<Bytes, VR>> materialized);byte[]>> materializedAs); } |
Code Block | |||||
---|---|---|---|---|---|
| |||||
public interface SessionWindowedKStream<K, V> { KTable<Windowed<K>, Long> count(KTable<K, Long> count(final Materialized<K, V, KeyValueStore<K, V>> materialized); KTable<K, V> reduce(final Reducer<V> adderKTable<Windowed<K>, final Reducer<V> subtractor, Long> count(final Materialized<K, VLong, KeyValueStore<KSessionStore<Bytes, V>> materializedbyte[]>> materializedAs); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Aggregator<? super K, ? super V, VR> subtractoraggregator, final Materialized<K, VR, KeyValueStore<K, VR>> materialized); |
For StreamsBuilder we remove all stream, table, and globalTable overloads that take more than a single argument and replace them with:
Code Block | ||||
---|---|---|---|---|
| ||||
public synchronized <K, V> KStream<K, V> stream(final String topic)
public synchronized <K, V> KStream<K, V> stream(final String topic, final Consumed<K, V> options)
public synchronized <K, V> KStream<K, V> stream(final Collection<String> topic, final Consumed<K, V> options)
public synchronized <K, V> KStream<K, V> stream(final Pattern pattern, final Consumed<K, V> options)
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed)
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V> materialized)
public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed)
public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V> materialized) |
New classes and interfaces:
Code Block | ||||
---|---|---|---|---|
| ||||
public interface WindowedKStream<K, V> { KTable<Windowed<K>, Long> count(); KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs); <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer final Merger<? super K, T> sessionMerger); <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Merger<? super K, T> sessionMerger, final Aggregator<? superfinal KMaterialized<K, ? super VVR, SessionStore<Bytes, VR> aggregatorbyte[]>> materializedAs); <VR> KTable<Windowed<K>, VR>V> aggregatereduce(final Initializer<VR> initializer,Reducer<V> reducer); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Aggregator<? super KMaterialized<K, ? super V, VR> aggregatorSessionStore<Bytes, byte[]>> materializedAs); } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Used when materializing a state store, i.e, during an aggregation operation or KTable operations */ public class Materialized<K, V, S extends StateStore> { public static <K, V, S extends finalStateStore> Materialized<K, VRV, WindowStore<K, VR>> materializedAs); S> as(final String storeName) public KTable<Windowed<K>static <K, V> reduce Materialized<K, V, WindowStore<Bytes, byte[]>> as(final Reducer<V>WindowStateStoreSupplier reducersupplier); public static KTable<Windowed<K><K, V> reduce(final Reducer<V> reducer, Materialized<K, V, SessionStore<Bytes, byte[]>> as(final SessionStateStoreSupplier supplier); public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> as(final KeyValueStateStoreSupplier supplier) public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde) finalpublic Materialized<K, V, WindowStore<K, V>> materializedAs); } | ||||
Code Block | ||||
| ||||
public interface SessionWindowedKStream<K, V> {S> withKeySerde(final Serde<K> valueSerde) public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> topicConfig) KTable<Windowed<K>public Materialized<K, V, Long>S> countwithLoggingDisabled(); KTable<Windowed<K>, Long> count(finalpublic Materialized<K, LongV, SessionStore<K, Long>> materializedAs);S> withCachingEnabled() <VRpublic Materialized<K, T> KTable<Windowed<K>V, VR>S> aggregate(final Initializer<VR> initializer,withCachingDisabled() public StateStoreBuilder storeBuilder() } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Optional params that can be passed to groupBy and groupByKey operations */ public class Serialized<K, V> { public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde) final Aggregator<? super Kpublic Serialized<K, ?V> super V, VR> aggregator,withKeySerde(final Serde<K> keySerde) public Serialized<K, V> withValueSerde(final Serde valueSerde) } |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Optional params that can be passed to join, leftJoin, outerJoin operations
*/
public class Joined<K, V, VO> {
public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde)
public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> keySerde)
public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde)
public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<V> valueSerde)
public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde)
public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde)
public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde)
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Optional arguments that can be specified when doing to and through operations
*/
public static <K, V> Produced<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)
public static <K, V> Produced<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final StreamPartitioner<K, V> partitioner)
public static <K, V> Produced<K, V> keySerde(final Serde<K> keySerde)
public static <K, V> Produced<K, V> valueSerde(final Serde<V> valueSerde)
public static <K, V> Produced<K, V> streamPartitioner(final StreamPartitioner<K, V> partitioner)
public Produced<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner)
public Produced<K, V> withValueSerde(final Serde<V> valueSerde)
public Produced<K, V> withKeySerde(final Serde<K> keySerde) |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * options that can be used when printing to stdout our writing to a file */ public class Printed<K, V> { public static <K, V> Printed<K, V> toFile(final String filepath) public static <K, V> Printed<K, V> toSysOut() public Printed<K, V> withLabel(final String label) public Printed<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper) } final Merger<? super K, T> sessionMerger); <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Merger<? super K, T> sessionMerger, final Materialized<K, VR, SessionStore<K, VR>> materializedAs); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, V, SessionStore<K, V>> materializedAs); } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * UsedOptions whenfor materializingconsuming a statetopic store, i.e, during an aggregation operation as a KStream or KTable operations */ public class Materialized<KConsumed<K, V, S extends StateStore> V> { public static <K, V> Consumed<K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName) V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final TimestampExtractor extractor, final Topology.AutoOffsetReset resetPolicy) public static <K, V> V Consumed<K, SV> extendswith(final StateStore>Serde<K> Materialized<KkeySerde, V, S> as(final BytesStoreSupplier<S>Serde<V> suppliervalueSerde) public Materialized<Kstatic <K, VV> Consumed<K, S>V> withValueSerdewith(final Serde<V>TimestampExtractor valueSerdeextractor) public Materialized<Kstatic <K, VV> Consumed<K, S>V> withKeySerdewith(final Serde<K> valueSerde) Topology.AutoOffsetReset resetPolicy) public Materialized<KConsumed<K, V, S> withLoggingEnabledV> withKeySerde(final Map<String, String> topicConfigSerde<K> keySerde) public Materialized<KConsumed<K, V, S> withLoggingDisabled( V> withValueSerde(final Serde<V> valueSerde) public Materialized<KConsumed<K, V, S> withCachingEnabled( V> withTimestampExtractor(final TimestampExtractor timestampExtractor) public Materialized<KConsumed<K, V, S> withCachingDisabled() } V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Optional params that can be passed to groupBy and groupByKey operations * Implementations of this will provide the ability to wrap a given StateStore * with or without caching/loggging etc. */ public class Serialized<K, V>interface StateStoreBuilder<T extends StateStore> { public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde) StateStoreBuilder<T> withCachingEnabled(); StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config); public Serialized<K, V> withKeySerde(final Serde<K> keySerde) T build(); public Serialized<KMap<String, V>String> withValueSerde(final Serde valueSerde)logConfig(); boolean loggingEnabled(); } |
Code Block | |||||
---|---|---|---|---|---|
| |||||
public interface StoreSupplier<T extends StateStore>/** * Optional params that can be passed to join, leftJoin, outerJoin operations */ public class Joined<K, V, VO> { public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde) /** * Return the name of this state store supplier. public static* <K,This V,must VO>be Joined<K,a V,valid VO>Kafka keySerde(final Serde<K> keySerde) public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde) topic name; valid characters are ASCII alphanumerics, '.', '_' and '-' * public static* <K,@return V,the VO>name Joined<K,of V,this VO>state otherValueSerde(final Serde<V> valueSerde) store supplier public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde)*/ String name(); public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde) /** public* Joined<K,Return V,a VO> withOtherValueSerde(final Serde<VO> otherValueSerde) } |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Optional arguments that can be specified when doing to and through operations
*/
public static <K, V> Produced<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)
public static <K, V> Produced<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final StreamPartitioner<K, V> partitioner)
public static <K, V> Produced<K, V> keySerde(final Serde<K> keySerde)
public static <K, V> Produced<K, V> valueSerde(final Serde<V> valueSerde)
public static <K, V> Produced<K, V> streamPartitioner(final StreamPartitioner<K, V> partitioner)
public Produced<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner)
public Produced<K, V> withValueSerde(final Serde<V> valueSerde)
public Produced<K, V> withKeySerde(final Serde<K> keySerde) |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* options that can be used when printing to stdout our writing to a file
*/
public class Printed<K, V> {
public static <K, V> Printed<K, V> toFile(final String filepath)
public static <K, V> Printed<K, V> toSysOut()
public Printed<K, V> withLabel(final String label)
public Printed<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper)
}
|
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Options for consuming a topic as a KStream or KTable
*/
public class Consumed<K, V> {
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final TimestampExtractor extractor, final Topology.AutoOffsetReset resetPolicy)
public static <K, V> Consumed<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)
public static <K, V> Consumed<K, V> with(final TimestampExtractor extractor)
public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy)
public Consumed<K, V> withKeySerde(final Serde<K> keySerde)
public Consumed<K, V> withValueSerde(final Serde<V> valueSerde)
public Consumed<K, V> withTimestampExtractor(final TimestampExtractor timestampExtractor)
public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy)
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Implementations of this will provide the ability to wrap a given StateStore
* with or without caching/loggging etc.
*/
public interface StateStoreBuilder<T extends StateStore> {
StateStoreBuilder<T> withCachingEnabled();
StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config);
T build();
Map<String, String> logConfig();
boolean loggingEnabled();
} |
new {@link StateStore} instance.
*
* @return a new {@link StateStore} instance of type T
*/
T get();
/**
* Return a String that is used as the scope for metrics recorded by Metered stores
* @return metricsScope
*/
String metricsScope();
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* A store supplier that can be used to create one or more {@link WindowStore} instances of type <Byte, byte[]>
*/
public interface WindowBytesStoreSupplier extends StoreSupplier<WindowStore<Bytes, byte[]>> {
/**
* The number of segments the store has. If your store is segmented then this should be the number of segments
* in the underlying store. It is also used to reduce the amount of data that is scanned when caching is enabled
*
* @return number of segments
*/
int segments();
/**
* The size of the windows any store created from this supplier is creating
* @return window size
*/
long windowSize();
/**
* Whether or not this store is retaining duplicate keys. Usually only true if the store is being used
* for joins. Note this should return false if caching is enabled
* @return true if duplicates should be retained
*/
boolean retainDuplicates();
/**
* The time period for which the {@link WindowStore} will retain historic data
* @return retentionPeriod
*/
long retentionPeriod();
} |
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* A store supplier that can be used to create one or more {@link KeyValueStore} instances of type <Byte, byte[]>
*/
public interface KeyValueBytesStoreSupplier extends StoreSupplier<KeyValueStore<Bytes, byte[]>> {
} |
Code Block | ||||
---|---|---|---|---|
/**
* A store supplier that can be used to create one or more {@link SessionStore} instances of type <Byte, byte[]>
*/
public interface SessionBytesStoreSupplier extends StoreSupplier<SessionStore<Bytes, byte[]>> {
/**
* The size of a segment, in milliseconds. Used when caching is enabled to segment the cache
* and reduce the amount of data that needs to be scanned when performing range queries | ||||
Code Block | ||||
| ||||
public interface BytesStoreSupplier<T extends StateStore> { /** * Return the name of this state store supplier. * This must be a valid Kafka topic name; valid characters are ASCII alphanumerics, '.', '_' and '-' * * @return the name of this state store supplier */ String name(); /** * Return a new {@link StateStore} instance. * * @return asegmentInterval new {@link StateStore} instance of type Tin milliseconds */ Tlong getsegmentInterval(); } |
Proposed Changes
Add the above methods, interfaces, classes to the DSL. Deprecate existing overloads on KStream, KTable, and KGroupedStream that take more than the required parameters, for example, KTable#filter(Predicate, String) and KTable#filter(Predicate, StateStoreSupplier) will be deprecated. StateStoreSupplier will also be deprecated
...