Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKTable
<KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector, GroupByOptions<KR, VR> groupByOptions);

KTable<K, V> filter(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized);

KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper, final Materialized<K, V, KeyValueStore<K, V>> materialized);

void to(final String topic, final TopicOptions<V, V> options);

KTable<K, V> through(final String topic, final Materialized<K, V> options);

<VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
                            final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                            final Materialized<K, V, KeyValueStore<K, VR>> materialized);

<VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                final Materialized<K, V, KeyValueStore<K, VR>> materialized);

<VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
                                 final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                 final Materialized<K, V, KeyValueStore<K, VR>> materialized);

Code Block
languagejava
titleStores
public static <K<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows);

SessionWindowedKStream<KStateStoreSupplier<KeyValueStore<K, V>V>> sessionWindowedBypersistentKeyValueStore(SessionWindows sessionWindows);

KTable<K, Long> count(final Materialized materialized);

KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final String name,
                                                final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR>Serde<K> aggValueSerdekeySerde,
                             final   Materialized<K, V, KeyValueStore<K, V>> materialized);
Code Block
languagejava
titleWindowedKStream
public interface WindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                   final Serde<V> valueSerde) 

public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> inMemoryKeyValueStore(final String name,
             final Aggregator<? super K, ? super V, VR> aggregator);

    <VR>  KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V,final VR>Serde<K> aggregatorkeySerde,
                                           final Materialized<K, VR, WindowStore<K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                final Serde<V> valueSerde)

public static <K, V> StateStoreSupplier<KeyValueStore<K, V>> lruMap(final String name,
         final Materialized<K, V, WindowStore<K, V>> materializedAs);


} 
Code Block
languagejava
titleSessionWindowedKStream
public interface SessionWindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<K, Long>> materializedAs);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                   final int capacity,
                         final Aggregator<? super K, ? super V, VR> aggregator,
                               final Serde<K> keySerde,
             final Merger<? super K, T> sessionMerger);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                    final Serde<V> valueSerde) 

public static <K, V> StateStoreSupplier<WindowStore<K, V>>  persistentWindowStore(final Aggregator<? super K, ? super V, VR> aggregator,
String name,
                                                    final Merger<? super K, T> sessionMerger,
                   final Windows windows,
                         final Materialized<K, VR, SessionStore<K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                              final Serde<K> keySerde,
  final Materialized<K, V, SessionStore<K, V>> materializedAs);
}
Code Block
languagejava
titleMaterialized
/**
 * Used when materializing a state store
 */
public class Materialized<K, V, S extends StateStore> {
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName)

                 public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StateStoreSupplier<S> supplier)

    public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde)

    public Materialized<K, V, S> withKeySerde(final Serde<K>Serde<V> valueSerde)

public static <K, V> public Materialized<KStateStoreSupplier<SessionStore<K, V, S> withLoggingEnabledV>> persistentSessionStore(final Map<StringString name,
 String> topicConfig)

    public Materialized<K, V, S> withLoggingDisabled()

    public Materialized<K, V, S> withCachingEnabled()

    public Materialized<K, V, S> withCachingDisabled()

} 
Code Block
languagejava
titleGroupByOptions
/**
 * Optional params that can be passed to groupBy and groupByKey operations
 */
public class GroupByOptions<K, V> {

                        public static <K, V> GroupByOptions<K, V> serdes(final Serde<K> keySerde, final Serde<V> valueSerde) {
        return new GroupByOptions<>();
    }
  final SessionWindows windows,
    public GroupByOptions<K, V> withKeySerde()

    public GroupByOptions<K, V> withValueSerde()
}
Code Block
languagejava
titleJoinOptions
/**
 * Optional params that can be passed to join, leftJoin, outerJoin operations
 */
public class JoinOptions<K, V, VO> {

    public static <K, V, VO> JoinOptions<K, V, VO> serdes(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde)
    public JoinOptions<K, V, VO> withKeySerde(final Serde<K> keySerde)

    public JoinOptions<K, V, VO> withValueSerde(final Serde<V> valueSerde) 

    publicfinal JoinOptions<K,Serde<K> VkeySerde,
 VO> withOtherValueSerde(final Serde<VO> otherValueSerde)
}
Code Block
languagejava
titleTopicOptions
/**
 * Optional arguments that can  be specified when doing to and through operations
 */
public class TopicOptions<K, V> {
    public static <K, V> TopicOptions<K, V> serdes(final Serde<K> keySerde, final Serde<V> valueSerde) {
        return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde);
    }

    public static <K, V> TopicOptions<K, V> options(final StreamPartitioner<K, V> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde) {
    final Serde<V> valueSerde)  return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde).withStreamPartitioner(partitioner);
    }

    public TopicOptions<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner) {
        return null;
    }

    public TopicOptions<K, V> withValueSerde(final Serde<V> valueSerde) {
 
public <K, V> StateStoreBuilder<WindowStore<K, V>> windowStoreBuilder(final StateStoreSupplier<WindowStore<K, V>> supplier)

public <K, V> StateStoreBuilder<KeyValueStore<K, V>> keyValueStoreBuilder(final StateStoreSupplier<KeyValueStore<K, V>> supplier)

public <K, V> StateStoreBuilder<SessionStore<K, V>> sessionStoreBuilder(final StateStoreSupplier<SessionStore<K, V>> supplier)
Code Block
languagejava
titleTopologyBuilder
public synchronized <K, V> TopologyBuilder addGlobalStore(final StateStoreBuilder storeSupplier,
        return null;
              }

    public TopicOptions<K, V> withKeySerde(final Serde<K> keySerde) {
        return null;
    }
}
Code Block
 
/**
 * options that can be used when printing to stdout our writing tofinal aString filesourceName,
 */
public  class PrintOptions<K,                                                      final TimestampExtractor timestampExtractor,
                                                          final Deserializer keyDeserializer,
                                                          final Deserializer valueDeserializer,
                                                          final String topic,
                                                          final String processorName,
                                                          final ProcessorSupplier stateUpdateSupplier)
 
public synchronized final TopologyBuilder addStateStore(final StateStoreBuilder supplier, final String... processorNames)
 

 

 

Code Block
<W extends Window> WindowedKStream<K, V> windowedBy(Windows<W> timeWindows);

SessionWindowedKStream<K, V> sessionWindowedBy(SessionWindows sessionWindows);

KTable<K, Long> count(final Materialized materialized);

KTable<K, V> reduce(final Reducer<V> reducer, final Materialized<K, V, KeyValueStore<K, V>> materialized);

<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                             final Aggregator<? super K, ? super V, VR> aggregator,
                             final Serde<VR> aggValueSerde,
                             final Materialized<K, V, KeyValueStore<K, V>> materialized);
Code Block
languagejava
titleWindowedKStream
public interface WindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, WindowStore<K, Long>> materializedAs);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator);

    <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                           final Aggregator<? super K, ? super V, VR> aggregator,
                                           final Materialized<K, VR, WindowStore<K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                  final Materialized<K, V, WindowStore<K, V>> materializedAs);


} 
Code Block
languagejava
titleSessionWindowedKStream
public interface SessionWindowedKStream<K, V> {

    KTable<Windowed<K>, Long> count();

    KTable<Windowed<K>, Long> count(final Materialized<K, Long, SessionStore<K, Long>> materializedAs);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                              final Aggregator<? super K, ? super V, VR> aggregator,
                                              final Merger<? super K, T> sessionMerger);

    <VR, T> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                              final Aggregator<? super K, ? super V, VR> aggregator,
                                              final Merger<? super K, T> sessionMerger,
                                              final Materialized<K, VR, SessionStore<K, VR>> materializedAs);


    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer);

    KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                  final Materialized<K, V, SessionStore<K, V>> materializedAs);
}
Code Block
languagejava
titleMaterialized
/**
 * Used when materializing a state store
 */
public class Materialized<K, V, S extends StateStore> {
    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final String storeName)

    public static <K, V, S extends StateStore> Materialized<K, V, S> as(final StateStoreSupplier<S> supplier)

    public Materialized<K, V, S> withValueSerde(final Serde<V> valueSerde)

    public Materialized<K, V, S> withKeySerde(final Serde<K> valueSerde)

    public Materialized<K, V, S> withLoggingEnabled(final Map<String, String> topicConfig)

    public Materialized<K, V, S> withLoggingDisabled()

    public Materialized<K, V, S> withCachingEnabled()

    public Materialized<K, V, S> withCachingDisabled()

} 
Code Block
languagejava
titleGroupByOptions
/**
 * Optional params that can be passed to groupBy and groupByKey operations
 */
public class GroupByOptions<K, V> {

    public static <K, V> GroupByOptions<K, V> serdes(final Serde<K> keySerde, final Serde<V> valueSerde) {
        return new GroupByOptions<>();
    }
    
    public GroupByOptions<K, V> withKeySerde()

    public GroupByOptions<K, V> withValueSerde()
}
Code Block
languagejava
titleJoinOptions
/**
 * Optional params that can be passed to join, leftJoin, outerJoin operations
 */
public class JoinOptions<K, V, VO> {

    public static <K, V, VO> JoinOptions<K, V, VO> serdes(final Serde<K> keySerde, final Serde <V> valueSerde, final Serde<VO> otherValueSerde)
    public JoinOptions<K, V, VO> withKeySerde(final Serde<K> keySerde)

    public JoinOptions<K, V, VO> withValueSerde(final Serde<V> valueSerde) 

    public JoinOptions<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde)
}
Code Block
languagejava
titleTopicOptions
/**
 * Optional arguments that can be specified when doing to and through operations
 */
public class TopicOptions<K, V> {
    public static <K, V> TopicOptions<K, V> serdes(final Serde<K> keySerde, final Serde<V> valueSerde) {
        return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde);
    }

    public static <K, V> TopicOptions<K, V> options(final StreamPartitioner<K, V> partitioner, final Serde<K> keySerde, final Serde<V> valueSerde) {
        return new TopicOptions<K, V>().withKeySerde(keySerde).withValueSerde(valueSerde).withStreamPartitioner(partitioner);
    }

    public TopicOptions<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner) {
        return null;
    }

    public TopicOptions<K, V> withValueSerde(final Serde<V> valueSerde) {
        return null;
    }

    public TopicOptions<K, V> withKeySerde(final Serde<K> keySerde) {
        return null;
    }
}
Code Block
 
/**
 * options that can be used when printing to stdout our writing to a file
 */
public class PrintOptions<K, V> {
    public static <K, V> PrintOptions<K, V> labeled(final String label) 
    
    public static <K, V> PrintOptions<K, V> toFile(final String filepath)

    public static <K, V> PrintOptions<K, V> sysOut(final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label) 
V> {
    public static <K, V> PrintOptions<K, V> labeledtoFile(final String label) 
    
    public static <K, V> filePath, final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label)

    public PrintOptions<K, V> toFilewithLabel(final String filepathlabel) 

    public static <K, V> PrintOptions<K, V> sysOutwithFile(final KeyValueMapper<? super K, ? super V, String> mapper,  String filepath)

    public PrintOptions<K, V> withKeySerde(final Serde<K> keySerde,)

   final Serde<V>public valSerdePrintOptions<K, V> withValueSerde(final StringSerde<V> labelkeySerde) 

    public static <K, V> PrintOptions<K, V> toFilewithKeyValueMapper(final String filePath, final KeyValueMapper<? super K, ? super V, String> mapper, final Serde<K> keySerde, final Serde<V> valSerde, final String label)

    public PrintOptions<K, V> withLabel(final String label) 

    public PrintOptions<K, V> withFile(final String filepath)String> mapper)
}

 

 

Code Block
/**
 * Implementations of this will provide the ability to wrap a given StateStore
 * with or without caching/loggging etc.
 */
public interface StateStoreBuilder<T extends StateStore> {

    public PrintOptions<K, V> withKeySerde(final Serde<K> keySerde)

StateStoreBuilder<T> withCachingEnabled();
    StateStoreBuilder<T> withCachingDisabled();
    publicStateStoreBuilder<T> PrintOptions<KwithLoggingEnabled(Map<String, V> withValueSerde(final Serde<V> keySerde)

String> config);
    public PrintOptions<K, V> withKeyValueMapper(final KeyValueMapper<? super K, ? super V, String> mapper)StateStoreBuilder<T> withLoggingDisabled();
    T build();
}

 

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

...