This page collects common usage pattern on how to use Kafka Streams Processor API or DSL.
Feel free to add your own code snippets and/or improve existing examples.
How to compute an (windowed) average?
Kafka Streams DSL aggregation natively support so-called "incremental" or "cumulative" aggregation functions (cf, https://en.wikipedia.org/wiki/Associative_property – also called "distributive functions" by Jim Gray in "Data Cube: A Relational Aggregation Operator Generalizing Group-By, Cross-Tab, and Sub-Totals") like count, sum, min, max. However, average function (a so-called "algebraic" function, cf. "Data Cube: A Relational Aggregation Operator Generalizing Group-By, Cross-Tab, and Sub-Totals") cannot be computed like this, but it can be composed of distributive functions, namely count and sum. Thus, it can be implemented in a two step approach:
class Tuple2<T1, T2> { public T1 value1; public T2 value2; Tuple2(T1 v1, T2 v2) { value1 = v1; value2 = v2; } } final KStreamBuilder builder = new KStreamBuilder(); // first step: compute count and sum in a single aggregation step and emit 2-tuples <count,sum> as aggregation result values final KTable<String,Tuple2<Long,Long>> countAndSum = builder.stream("someInputTopic") .groupByKey() .aggregate( new Initializer<Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> apply() { return new Tuple2<>(0L, 0L); } }, new Aggregator<String, String, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> apply(final String key, final Long value, final Tuple2<Long, Long> aggregate) { ++aggregate.value1; aggregate.value2 += value; return aggregate; } }, new Tuple2Serde()); // omitted for brevity // second stop: compute average for each 2-tuple final KTable<String,Double> average = countAndSum.mapValues( new ValueMapper<Tuple2<Long, Long>, Double>() { @Override public Double apply(Tuple2<Long, Long> value) { return value.value2 / (double) value.value1; } });
This pattern can also be applied to compute a windowed average or to compose other algebraic functions.
How to compute windowed aggregations over successively increasing timed windows
Draft
KTable<Windowed<Key>, Value> oneMinuteWindowed = // where Key and Value stand for your actual key and value types yourKStream .groupByKey() .reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m"); //where your adder can be as simple as (val, agg) -> agg + val //for primitive types or as complex as you need KTable<Windowed<Key>, Value> fiveMinuteWindowed = oneMinuteWindowed .groupBy( (windowedKey, value) -> new KeyValue<>( new Windowed<>( windowedKey.key(), new Window<>( windowedKey.window().start() /1000/60/5 *1000*60*5, windowedKey.window().start() /1000/60/5 *1000*60*5 + 1000*60*5 // the above rounds time down to a timestamp divisible by 5 minutes ) ), value ), /* your key serde */, /* your value serde */ ) .reduce(/*your adder*/, /*your subtractor*/, "store5m"); // where your subtractor can be as simple as (val, agg) -> agg - val for primitive types // or as complex as you need, // just make sure you get the parameter order right, subtraction is not commutative! KTable<Windowed<Key>, Value> fifteenMinuteWindowed = fiveMinuteWindowed .groupBy( (windowedKey, value) -> new KeyValue<>( new Windowed<>( windowedKey.key(), new Window<>( windowedKey.window().start() /1000/60/15 *1000*60*15, windowedKey.window().start() /1000/60/15 *1000*60*15 + 1000*60*15 // the above rounds time down to a timestamp divisible by 15 minutes ) ), value ), /* your key serde */, /* your value serde */ ) .reduce(/*your adder*/, /*your subtractor*/, "store15m"); KTable<Windowed<Key>, Value> sixtyMinuteWindowed = fifteeenMinuteWindowed .groupBy( (windowedKey, value) -> new KeyValue<>( new Windowed<>( windowedKey.key(), new Window<>( windowedKey.window().start() /1000/60/60 *1000*60*60, windowedKey.window().start() /1000/60/60 *1000*60*60 + 1000*60*60 // the above rounds time down to a timestamp divisible by 60 minutes ) ), value ), /* your key serde */, /* your value serde */ ) .reduce(/*your adder*/, /*your subtractor*/, "store60m");
TODO: to mitigate infinite state store growth (until re-balance rebuilds it from the changelog) you can implement the Windowed key serde to store the timestamp(s) before the actual record key and periodically do a ranged query on each of the state stores to find and delete all data older than x (using punctuate() inside a Processor). TBC...
How to aggregate data from all currently active sessions?
The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I want to first do summation with max parallelism and minimize the work needed downstream. So I calculate a per-partition sum first to limit the updates that the totals topic will receive and the summing work done by the interactive queries on the global store. builder
.stream(/*key serde*/, /*transaction serde*/, "transaciton-topic")
.groupByKey(/*key serde*/, /*transaction serde*/)
.aggregate(
() -> /*empty aggregate*/,
aggregator(),
merger(),
SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2),
/* aggregate serde */,
txPerCustomerSumStore() // this store can be queried for per customer session data )
.toStream()
.filter(((key, value) -> value != null)) // tombstones only come when a session is merged into a bigger session, so ignore them
// the below map/groupByKey/reduce operations are to only propagate updates to the latest session per customer to downstream
.map((windowedCustomerId, agg) -> // this moves timestamp from the windowed key into the value
// so that we can group by customerId only and reduce to the later value
new KeyValue<>(
windowedCustomerId.key(), // just customerId
new WindowedAggsImpl( // this is just like a tuple2 but with nicely named accessors: timestamp() and aggs()
windowedCustomerId.window().end(),
agg
)
)
)
.groupByKey( /*key serde*/, /*windowed aggs serde*/ ) // key is just customerId
.reduce( // take later session value and ignore any older - downstream only cares about current sessions
(val, agg) -> val.timestamp() > agg.timestamp() ? val : agg,
TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS),
"latest-session-windowed"
)
.groupBy((windowedCustomerId, timeAndAggs) -> // calculate totals with maximum granularity, which is per-partition
new KeyValue<>(
new Windowed<>(
windowedCustomerId.key().hashCode() % PARTITION_COUNT_FOR_TOTALS, // KIP-159 would come in handy here, to access partition number instead
windowedCustomerId.window() // will use this in the interactive queries to pick the oldest not-yet-expired window
),
timeAndAggs.aggs()
),
new SessionKeySerde<>(Serdes.Integer()),
/* aggregate serde */
)
.reduce(
(val, agg) -> agg.add(val),
(val, agg) -> agg.subtract(val),
txTotalsStore() // this store can be queried to get totals per partition for all active sessions
);
builder.globalTable(
new SessionKeySerde<>(Serdes.Integer()),
/* aggregate serde */,
changelogTopicForStore(TRANSACTION_TOTALS), "totals");
// this global table puts per partition totals on every node, so that they can be easily summed for global totals, picking the oldest not-yet-expired window
TODO: put in StreamParitioners (with KTable.through variants added in KAFKA-5045) to avoid re-partitioning where I know it's unnecessary.