Table of Contents |
---|
Status
Current state: Under Discussion Adopted (1.0)
Discussion thread: [DISCUS] KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engineshere [Change the link from the KIP proposal email archive to your own email thread]
JIRA: KAFKA-5651
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||||
---|---|---|---|---|
| ||||
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, Serialized<KR, VR> serialized); KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<Bytes[], byte[]>> materialized); <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Materialized<K, V, KeyValueStore<Bytes[], byte[]>> materialized); <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); |
We add some new helper methods to Stores so people can conveniently and quickly create basic StateStoreSuppliers for use in the DSL or PAPI. We will also deprecate the existing Stores.create(...)
...
Code Block | ||||
---|---|---|---|---|
| ||||
<W extends Window> WindowedKStream<KTimeWindowedKStream<K, V> windowedBy(Windows<W> timeWindows); SessionWindowedKStream<K, V> windowedBy(SessionWindows sessionWindows); KTable<K, Long> count(final Materialized<K, Long> materialized); KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Serde<VR> aggValueSerde, final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public synchronized <K, V> KStream<K, V> stream(final String topic) public synchronized <K, V> KStream<K, V> stream(final String topic, final Consumed<K, V> options) public synchronized <K, V> KStream<K, V> stream(final Collection<String> topic, final Consumed<K, V> options) public synchronized <K, V> KStream<K, V> stream(final Collection<String> topic) public synchronized <K, V> KStream<K, V> stream(final Pattern pattern, final Consumed<K, V> options) public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed) public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed) public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) |
...
Code Block | ||||
---|---|---|---|---|
| ||||
public interface WindowedKStream<KTimeWindowedKStream<K, V> { KTable<Windowed<K>, Long> count(); KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materializedAs); <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator); <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator, final Materialized<K, VR, WindowStore<Bytes, byte[]>> materializedAs); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer); KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, final Materialized<K, V, WindowStore<Bytes, byte[]>> materializedAs); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Implementations of this will provide the ability to wrap a given StateStore * with or without caching/loggging etc. */ public interface StateStoreBuilder<TStoreBuilder<T extends StateStore> { StateStoreBuilder<T>StoreBuilder<T> withCachingEnabled(); StateStoreBuilder<T>StoreBuilder<T> withLoggingEnabled(Map<String, String> config); StateStoreBuilder<T>StoreBuilder<T> withLoggingDisabled() T build(); Map<String, String> logConfig(); boolean loggingEnabled(); } |
...
Code Block |
---|
/** * A store supplier that can be used to create one or more {@link SessionStore} instances of type <Byte, byte[]> */ public interface SessionBytesStoreSupplier extends StoreSupplier<SessionStore<Bytes, byte[]>> { /** * The size of a segment, in milliseconds. Used when caching is enabled to segment the cache * and reduce the amount of data that needs to be scanned when performing range queries * * @return segmentInterval in milliseconds */ long segmentIntervalsegmentIntervalMs(); } |
Proposed Changes
Add the above methods, interfaces, classes to the DSL. Deprecate existing overloads on KStream, KTable, and KGroupedStream that take more than the required parameters, for example, KTable#filter(Predicate, String) and KTable#filter(Predicate, StateStoreSupplier) will be deprecated. StateStoreSupplier will also be deprecated deprecated. All versions of KTable#through and KTable#to will be deprecated in favour of using KTable#toStream()#through and KTable#toStream()#to
The new Interface BytesStoreSupplier supersedes the existing StateStoreSupplier (which will remain untouched). This so we can provide a convenient way for users creating custom state stores to wrap them with caching/logging etc if they chose. In order to do this we need to force the inner most store, i.e, the custom store, to be a store of type `<Bytes, byte[]>`.
...