Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKGroupedTable
KTable<K, Long> count(final Materialized<K, V, KeyValueStore<K, V>> materialized);

KTable<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Aggregator<? super K, ? super V, VR> subtractor,
                             final Materialized<K, VR, KeyValueStore<K, VR>> materialized);

 

...

For StreamsBuilder we remove all stream, table, and globalTable overloads that take more than a single argument and replace them with:

language
Code Block
java
titleWindowedKStream
public interfacesynchronized WindowedKStream<K<K, V> {

KStream<K, V> stream(final Consumed<K, KTable<Windowed<K>V> options, final String... topics)
public synchronized <K, V> KStream<K, V> stream(final Pattern pattern, final Consumed<K, V> options)

public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed)

public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V> materialized)

public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed)

public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V> materialized)

 


New classes and interfaces:

Code Block
languagejava
titleWindowedKStream
public interface WindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs);

    <VR> KTable<Windowed<K>, VR> Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator,);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                  final Materialized<K, VR, WindowStore<K, VR>> materializedAs);


    KTable<Windowed<K>final Aggregator<? super K, V> reduce(final Reducer<V> reducer);

? super V, VR> aggregator,
     KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                  final Materialized<K, VVR, WindowStore<K, V>>VR>> materializedAs);


Code Block
languagejava
titleSessionWindowedKStream
public interface SessionWindowedKStream<K, V> {

    KTable<Windowed<K>, Long>V> countreduce(final Reducer<V> reducer);

    KTable<Windowed<K>, Long>V> countreduce(final Materialized<K,Reducer<V> Long, SessionStore<K, Long>> materializedAs);
reducer,
    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                        final Materialized<K, V, WindowStore<K, V>> materializedAs);


} 
Code Block
languagejava
titleSessionWindowedKStream
public interface SessionWindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long>  count(final Aggregator<? super K, ? super VMaterialized<K, Long, SessionStore<K, Long>> materializedAs);

    <VR, T> KTable<Windowed<K>, VR> aggregatoraggregate(final Initializer<VR> initializer,
                                              final Merger<Aggregator<? super K, T> sessionMerger);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer? super V, VR> aggregator,
                                              final Aggregator<Merger<? super K, ? super V, VR> aggregatorT> sessionMerger);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                              final Merger<Aggregator<? super K, ? super V, T>VR> sessionMergeraggregator,
                                              final Materialized<K, VR, SessionStore<KMerger<? super K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

T> sessionMerger,
        KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                  final Materialized<K, VVR, SessionStore<K, V>>VR>> materializedAs);
}
Code Block
languagejava
titleMaterialized
/**

 *  Used whenKTable<Windowed<K>, materializingV> areduce(final state storeReducer<V> reducer);

 */
public class Materialized<K, VKTable<Windowed<K>, SV> extendsreduce(final StateStore>Reducer<V> {reducer,
     public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName)

            public static <K, V, S extends StateStore>final Materialized<K, V, SessionStore<K, V>> materializedAs);
}
Code Block
languagejava
titleMaterialized
/**
 * Used when materializing a state store
 */
public classS> as(final BytesStoreSupplier<S> supplier)

    public Materialized<K, V, S>S withValueSerde(finalextends Serde<V>StateStore> valueSerde){

    public static <K, V, S extends StateStore> Materialized<K, V, S> withKeySerdeas(final Serde<K>String valueSerdestoreName)

    public static <K, V, S extends StateStore> Materialized<K, V, S> withLoggingEnabledas(final Map<String, String> topicConfigBytesStoreSupplier<S> supplier)

    public Materialized<K, V, S> withLoggingDisabledwithValueSerde(final Serde<V> valueSerde)

    public Materialized<K, V, S> withCachingEnabledwithKeySerde(final Serde<K> valueSerde)

    public Materialized<K, V, S> withCachingDisabled()

} withLoggingEnabled(final Map<String, String> topicConfig)

    public Materialized<K, V, S> withLoggingDisabled()

    public Materialized<K, V, S> withCachingEnabled()

    public Materialized<K, V, S> withCachingDisabled()

} 
Code Block
Code Block
languagejava
titleSerialized
/**
 * Optional params that can be passed to groupBy and groupByKey operations
 */
public class Serialized<K, V> {

    public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)

    public static <K, V> Serialized<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde)

    public Serialized<K, V> withKeySerde(final Serde<K> keySerde)

    public Serialized<K, V> withValueSerde(final Serde valueSerde)
}

...

Code Block
languagejava
titlePrinted
 
/**
 * options that can be used when printing to stdout our writing to a file
 */
public class Printed<K, V> {
 Printed<K, V> {   public static <K, V> Printed<K, V> toFile(final String filepath)

    public static <K, V> Printed<K, V> toSysOut()

    public Printed<K, V> withLabel(final String label)

    public Printed<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper)

}
Code Block
languagejava
titleConsumed
/**
 * Options for consuming a topic as a KStream or KTable
 */
public class Consumed<K, V> {
    public static <K, V>  Consumed<K, V> deserializedWith(final Serde<K> keySerde, final Serde<V> valueSerde)
    public static <K, V>  Consumed<K, V> timestampedWith(final TimestampExtractor extractor)
    public static <K, V> Printed<K Consumed<K, V> toFileoffsetResetWith(final StringTopology.AutoOffsetReset filepathresetPolicy)
    
    public static <KConsumed<K, V> Printed<K, V> toSysOut(withKeySerde(final Serde<K> keySerde)

    public Printed<KConsumed<K, V> withLabelwithValueSerde(final StringSerde<V> labelvalueSerde)

    public Printed<KConsumed<K, V> withKeyValueMapperwithTimestampExtractor(final KeyValueMapper<? super K, ? super V, String> mapper)

}
TimestampExtractor timestampExtractor)

    public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy)
}
Code Block
languagejava
titleStateStoreBuilder
/**
 * Implementations of this will provide the ability to wrap a given StateStore
 * with or without caching/loggging etc.
 */
public interface StateStoreBuilder<T extends StateStore> {

    StateStoreBuilder<T> withCachingEnabled();
    StateStoreBuilder<T> withLoggingEnabled(Map<String, String> config);
    T build();
}

...