Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Adopted (1.0)

Discussion thread: [DISCUS] KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engineshere [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-5651 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
titleKStream
void print(final Printed<K, V> printed);

KStream<K, V> through(final String topic, final Produced<K, V> partitioned);

void to(final String topic, final Produced<V, V> partitioned);

KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized);

<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector, Serialized<KR, V> serialized);

<VO, VR> KStream<K, VR> join(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final Joined<K, V, VO> options);

<VT, VR> KStream<K, VR> join(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Joined<K, V, VT> options);

<VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final Joined<K, V, VO> options);

<VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Joined<K, V, VT> options);

<VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, final Joined<K, V, VO> options);

<VT


Code Block
languagejava
titleKTable
<KR, VR> KStream<KKGroupedTable<KR, VR> outerJoingroupBy(final KTable<K,KeyValueMapper<? VT>super otherK, final ValueJoiner<? super V, ?KeyValue<KR, superVR>> VTselector, ? extends VR> joiner, final Joined<K, V, VT> options);
Code Block
languagejava
titleKTable
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, Serialized<KR, VR> serialized);Serialized<KR, VR> serialized);

KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<Bytes[], byte[]>> materialized);

<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Materialized<K, V, KeyValueStore<Bytes[], byte[]>> materialized);

void to(final String topic, final Produced<V, V> options);

KTable<K, V> through(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]> options);

<VO, VR> KTable<K, VR> join(final KTable<K<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
 

 

We add some new helper methods to Stores so people can conveniently and quickly create basic StateStoreSuppliers for use in the DSL or PAPI. We will also deprecate the existing Stores.create(...)

...

Code Block
languagejava
titleKGroupedStream
<W extends Window> WindowedKStream<KTimeWindowedKStream<K, V> windowedBy(Windows<W> timeWindows);

SessionWindowedKStream<K, V> windowedBy(SessionWindows sessionWindows);

KTable<K, Long> count(final Materialized<K, Long> materialized);

KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR>Materialized<K, aggValueSerdeVR,
 KeyValueStore<Bytes,                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);byte[]>> materialized);


Code Block
Code Block
languagejava
titleKGroupedTable
KTable<K, Long> count(final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Aggregator<? super K, ? super V, VR> subtractor,
                             final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

...

Code Block
languagejava
titleStreamsBuilder
public synchronized <K, V> KStream<K, V> stream(final String topic)
 
public synchronized <K, V> KStream<K, V> stream(final String topic, final Consumed<K, V> options)
 
public synchronized <K, V> KStream<K, V> stream(final Collection<String> topic, final Consumed<K, V> options)
 
public synchronized <K, V> KStream<K, V> stream(final Collection<String> topic)

public synchronized <K, V> KStream<K, V> stream(final Pattern pattern, final Consumed<K, V> options)

public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed)

public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
 
public synchronized <K, V> GlobalKTable<KKTable<K, V> globalTabletable(final String topic, final Consumed<KMaterialized<K, V> consumed)

public V, KeyValueStore<Bytes, byte[]>> materialized)

public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed)

public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
 
public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)

 


New classes and interfaces:

Code Block
languagejava
titleWindowedKStream
public interface WindowedKStream<KTimeWindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<Bytes, byte[]>> materializedAs);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator,
                                           final Materialized<K, VR, WindowStore<Bytes, byte[]>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                  final Materialized<K, V, WindowStore<Bytes, byte[]>> materializedAs);


} 

...

Code Block
languagejava
titleMaterialized
/**
 * Used when materializing a state store, i.e, during an aggregation operation or KTable operations
 */
public class operations
 */
public class Materialized<K, V, S extends StateStore> {

	public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName)

    public static <K, V> Materialized<K, V, WindowStore<Bytes, S extends StateStore> {byte[]>> as(final WindowBytesStoreSupplier supplier);
 
    public static <K, V, S extends StateStore> V> Materialized<K, V, SessionStore<Bytes, S>byte[]>> as(final StringSessionBytesStoreSupplier storeNamesupplier);
 
    public static <K, V> Materialized<K, V, WindowStore<BytesKeyValueStore<Bytes, byte[]>> as(final WindowStateStoreSupplierKeyValueBytesStoreSupplier supplier);
 
    public staticMaterialized<K, <KV, V>S> withValueSerde(final Serde<V> valueSerde)

    public Materialized<K, V, SessionStore<Bytes, byte[]>> asS> withKeySerde(final SessionStateStoreSupplierSerde<K> suppliervalueSerde);
 
    public static <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> asS> withLoggingEnabled(final KeyValueStateStoreSupplier supplierMap<String, String> topicConfig)
 
    public Materialized<K, V, S> withValueSerdewithLoggingDisabled(final Serde<V> valueSerde)

    public Materialized<K, V, S> withKeySerdewithCachingEnabled(final Serde<K> valueSerde)

    public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> topicConfig)

    public Materialized<K, V, S> withLoggingDisabled()

    public Materialized<K, V, S> withCachingEnabled()

    public Materialized<K, V, S> withCachingDisabled()

    public StateStoreBuilder storeBuilder withCachingDisabled()

	public String storeName();

	public StoreSupplier<S> storeSupplier()

	public Serde<K> keySerde()

	public Serde<V> valueSerde()

	public boolean loggingEnabled()

	public Map<String, String> logConfig()

	public boolean cachingEnabled()

} 


Code Block
languagejava
titleSerialized
/**
 * Optional params that can be passed to groupBy and groupByKey operations
 */
public class Serialized<K, V> {

    public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)

    public Serialized<K, V> withKeySerde(final withKeySerde(final Serde<K> keySerde)

    public Serialized<K, V> withValueSerde(final Serde valueSerde)
 
	public Serde<K> keySerde()
 
    public Serialized<K, V> withValueSerde(final Serde valueSerdeSerde<V> valueSerde()
}


Code Block
languagejava
titleJoined
/**
 * Optional params that can be passed to join, leftJoin, outerJoin operations
 */
public class Joined<K, V, VO> {

    public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde)

    public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> keySerde)

    public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde)

    public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<V> valueSerde)

    public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde)

    public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde)

    public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde)
 
	public Serde<K> keySerde()
 
    public Serde<V> valueSerde()
 
	public Serde<VO> otherValueSerde()
}


Code Block
languagejava
titleProduced
/**
 * Optional arguments that can be specified when doing to and through operations
 */
public static <K, V> Produced<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)

public static <K, V> Produced<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final StreamPartitioner<K, V> partitioner) 

public static <K, V> Produced<K, V> keySerde(final Serde<K> keySerde)

public static <K, V> Produced<K, V> valueSerde(final Serde<V> valueSerde) 

public static <K, V> Produced<K, V> streamPartitioner(final StreamPartitioner<K, V> partitioner) 

public Produced<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner) 
public Produced<K, V> withValueSerde(final Serde<V> valueSerde) 
public Produced<K V> withValueSerde(final Serde<V> valueSerde) 
public Produced<K, V> withKeySerde(final Serde<K> keySerde)
public Serde<K> keySerde()
public Serde<K> valueSerde()
public StreamPartitioner<K, V> withKeySerdestreamPartitioner(final Serde<K> keySerde)


Code Block
languagejava
titlePrinted
 
/**
 * options that can be used when printing to stdout our writing to a file
 */
public class Printed<K, V> {
    public static <K, V> Printed<K, V> toFile(final String filepath)

    public static <K, V> Printed<K, V> toSysOut()

    public Printed<K, V> withLabel(final String label)

    public Printed<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper)

    public ProcessorSupplier<K, V> build(final String processorName)
}

...

Code Block
languagejava
titleConsumed
/**
 * Options for consuming a topic as a KStream or KTable
 */
public class Consumed<K, V> {
    public static <K, V>  Consumed<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde, final TimestampExtractor extractor, final Topology.AutoOffsetReset resetPolicy)
    public static <K, V>  Consumed<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)
    public static <K, V>  Consumed<K, V> with(final TimestampExtractor extractor)
    public static <K, V>  Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy)
    
    public Consumed<K, V> withKeySerde(final Serde<K> keySerde)

    public Consumed<K, V> withValueSerde(final(final Serde<V> valueSerde)

    public Consumed<K, V> withTimestampExtractor(final TimestampExtractor timestampExtractor)

    public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy)
 
	public Serde<K> keySerde()
 
	public Serde<V> valueSerde()
 
    public Consumed<K, V> withTimestampExtractor(final 	public TimestampExtractor timestampExtractor()
 
    	public Consumed<K, V> withOffsetResetPolicy(final TopologyToploogy.AutoOffsetReset resetPolicyoffsetResetPolicy()
}


Code Block
languagejava
titleStateStoreBuilder
/**
 * Implementations of this will provide the ability to wrap a given StateStore
 * with or without caching/loggging etc.
 */
public interface StateStoreBuilder<TStoreBuilder<T extends StateStore> {

    StateStoreBuilder<T>StoreBuilder<T> withCachingEnabled();
    StateStoreBuilder<T>StoreBuilder<T> withLoggingEnabled(Map<String, String> config);
    StoreBuilder<T> withLoggingDisabled()
    T build();
    Map<String, String> logConfig();
    boolean loggingEnabled();
}

...

Code Block
/**
 * A store supplier that can be used to create one or more {@link SessionStore} instances of type &lt;Byte, byte[]&gt;
 */
public interface SessionBytesStoreSupplier extends StoreSupplier<SessionStore<Bytes, byte[]>> {

    /**
     * The size of a segment, in milliseconds. Used when caching is enabled to segment the cache
     * and reduce the amount of data that needs to be scanned when performing range queries
     *
     * @return segmentInterval in milliseconds
     */
    long segmentIntervalsegmentIntervalMs();
}

 

Proposed Changes

Add the above methods, interfaces, classes to the DSL. Deprecate existing overloads on KStream, KTable, and KGroupedStream that take more than the required parameters, for example, KTable#filter(Predicate, String) and KTable#filter(Predicate, StateStoreSupplier) will be deprecated. StateStoreSupplier will also be deprecated deprecated. All versions of KTable#through and KTable#to will be deprecated in favour of using KTable#toStream()#through and  KTable#toStream()#to  

The new Interface BytesStoreSupplier supersedes the existing StateStoreSupplier (which will remain untouched). This so we can provide a convenient way for users creating custom state stores to wrap them with caching/logging etc if they chose. In order to do this we need to force the inner most store, i.e, the custom store, to be a store of type `<Bytes, byte[]>`. 

...