This page collects common usage pattern on how to use Kafka Streams Processor API or DSL.
Feel free to add your own code snippets and/or improve existing examples.
How to compute an (windowed) average?
Kafka Streams DSL aggregation natively support so-called "incremental" or "cumulative" aggregation functions (cf, https://en.wikipedia.org/wiki/Associative_property – also called "distributive functions" by Jim Gray in "Data Cube: A Relational Aggregation Operator Generalizing Group-By, Cross-Tab, and Sub-Totals") like count, sum, min, max. However, average function (a so-called "algebraic" function, cf. "Data Cube: A Relational Aggregation Operator Generalizing Group-By, Cross-Tab, and Sub-Totals") cannot be computed like this, but it can be composed of distributive functions, namely count and sum. Thus, it can be implemented in a two step approach:
class Tuple2<T1, T2> { public T1 value1; public T2 value2; Tuple2(T1 v1, T2 v2) { value1 = v1; value2 = v2; } } final KStreamBuilder builder = new KStreamBuilder(); // first step: compute count and sum in a single aggregation step and emit 2-tuples <count,sum> as aggregation result values final KTable<String,Tuple2<Long,Long>> countAndSum = builder.stream("someInputTopic") .groupByKey() .aggregate( new Initializer<Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> apply() { return new Tuple2<>(0L, 0L); } }, new Aggregator<String, String, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> apply(final String key, final Long value, final Tuple2<Long, Long> aggregate) { ++aggregate.value1; aggregate.value2 += value; return aggregate; } }, new Tuple2Serde()); // omitted for brevity // second stop: compute average for each 2-tuple final KTable<String,Double> average = countAndSum.mapValues( new ValueMapper<Tuple2<Long, Long>, Double>() { @Override public Double apply(Tuple2<Long, Long> value) { return value.value2 / (double) value.value1; } });
This pattern can also be applied to compute a windowed average or to compose other algebraic functions.
How to compute windowed aggregations over successively increasing timed windows
Draft
KTable<Windowed<Key>, Value> oneMinuteWindowed = // where Key and Value stand for your actual key and value types yourKStream .groupByKey() .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m"); //where your adder can be as simple as (val, agg) -> agg + val //for primitive types or as complex as you need KTable<Windowed<Key>, Value> fiveMinuteWindowed = oneMinuteWindowed .groupBy( (windowedKey, value) -> new KeyValue<>( new Windowed<>( windowedKey.key(), new Window<>( windowedKey.window().start() /1000/60/5 *1000*60*5, windowedKey.window().start() /1000/60/5 *1000*60*5 + 1000*60*5 // the above rounds time down to a timestamp divisible by 5 minutes ) ), value ), /* your key serde */, /* your value serde */ ) .reduce(/*your adder*/, /*your subtractor*/, "store5m"); KTable<Windowed<Key>, Value> fifteenMinuteWindowed = fiveMinuteWindowed .groupBy( (windowedKey, value) -> new KeyValue<>( new Windowed<>( windowedKey.key(), new Window<>( windowedKey.window().start() /1000/60/15 *1000*60*15, windowedKey.window().start() /1000/60/15 *1000*60*15 + 1000*60*15 // the above rounds time down to a timestamp divisible by 15 minutes ) ), value ), /* your key serde */, /* your value serde */ ) .reduce(/*your adder*/, /*your subtractor*/, "store15m"); KTable<Windowed<Key>, Value> sixtyMinuteWindowed = fifteeenMinuteWindowed .groupBy( (windowedKey, value) -> new KeyValue<>( new Windowed<>( windowedKey.key(), new Window<>( windowedKey.window().start() /1000/60/60 *1000*60*60, windowedKey.window().start() /1000/60/60 *1000*60*60 + 1000*60*60 // the above rounds time down to a timestamp divisible by 5 minutes ) ), value ), /* your key serde */, /* your value serde */ ) .reduce(/*your adder*/, /*your subtractor*/, "store60m");