You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

This page collects common usage pattern on how to use Kafka Streams Processor API or DSL.

Feel free to add your own code snippets and/or improve existing examples.

 

How to compute an (windowed) average?

Kafka Streams DSL aggregation natively support so-called "incremental" or "cumulative" aggregation functions (cf, https://en.wikipedia.org/wiki/Associative_property – also called "distributive functions" by Jim Gray in "Data Cube: A Relational Aggregation Operator Generalizing Group-By, Cross-Tab, and Sub-Totals") like count, sum, min, max. However, average function (a so-called "algebraic" function, cf. "Data Cube: A Relational Aggregation Operator Generalizing Group-By, Cross-Tab, and Sub-Totals") cannot be computed like this, but it can be composed of distributive functions, namely count and sum. Thus, it can be implemented in a two step approach:

Non-Windowed Average
class Tuple2<T1, T2> {
  public T1 value1;
  public T2 value2;
 
  Tuple2(T1 v1, T2 v2) {
    value1 = v1;
    value2 = v2;
  }
}

final KStreamBuilder builder = new KStreamBuilder();
 
// first step: compute count and sum in a single aggregation step and emit 2-tuples <count,sum> as aggregation result values
final KTable<String,Tuple2<Long,Long>> countAndSum = builder.stream("someInputTopic")
                                                            .groupByKey()
                                                            .aggregate(
                                                                new Initializer<Tuple2<Long, Long>>() {
                                                                    @Override
                                                                    public Tuple2<Long, Long> apply() {
                                                                        return new Tuple2<>(0L, 0L);
                                                                    }
                                                                },
                                                                new Aggregator<String, String, Tuple2<Long, Long>>() {
                                                                    @Override
                                                                    public Tuple2<Long, Long> apply(final String key, final Long value, final Tuple2<Long, Long> aggregate) {
                                                                            ++aggregate.value1;
                                                                            aggregate.value2 += value;
                                                                            return aggregate;
                                                                    }
                                                                },
                                                                new Tuple2Serde()); // omitted for brevity
 
// second stop: compute average for each 2-tuple
final KTable<String,Double> average = countAndSum.mapValues(
                                                     new ValueMapper<Tuple2<Long, Long>, Double>() {
                                                         @Override
                                                         public Double apply(Tuple2<Long, Long> value) {
                                                             return value.value2 / (double) value.value1;
                                                         }
                                                     });

This pattern can also be applied to compute a windowed average or to compose other algebraic functions.

How to compute windowed aggregations over successively increasing timed windows

Draft

KTable<Windowed<Key>, Value> oneMinuteWindowed = // where Key and Value stand for your actual key and value types
 
	yourKStream
 
	.groupByKey()

	.reduce(/*your adder*/, TimeWindows.of(60*1000, 60*1000), "store1m");
	        //where your adder can be as simple as (val, agg) -> agg + val 
	        //for primitive types or as complex as you need
 
KTable<Windowed<Key>, Value> fiveMinuteWindowed = 
 
	oneMinuteWindowed
	.groupBy( (windowedKey, value) -> 
		new KeyValue<>(
			new Windowed<>(
				windowedKey.key(),
				new Window<>(
					windowedKey.window().start() /1000/60/5 *1000*60*5, 
					windowedKey.window().start() /1000/60/5 *1000*60*5 + 1000*60*5
            		// the above rounds time down to a timestamp divisible by 5 minutes
				)
			),
			value
		),
		/* your key serde */, 
		/* your value serde */
	)
    .reduce(/*your adder*/, /*your subtractor*/, "store5m");
 
KTable<Windowed<Key>, Value> fifteenMinuteWindowed = 
 
	fiveMinuteWindowed

    .groupBy( (windowedKey, value) -> 
		new KeyValue<>(
			new Windowed<>(
				windowedKey.key(),
				new Window<>(
					windowedKey.window().start() /1000/60/15 *1000*60*15,
					windowedKey.window().start() /1000/60/15 *1000*60*15 + 1000*60*15
            		// the above rounds time down to a timestamp divisible by 15 minutes
				)
			),
			value
		),
		/* your key serde */, 
		/* your value serde */
	)
    .reduce(/*your adder*/, /*your subtractor*/, "store15m");

KTable<Windowed<Key>, Value> sixtyMinuteWindowed = 
 
	fifteeenMinuteWindowed

    .groupBy( (windowedKey, value) -> 
		new KeyValue<>(
			new Windowed<>(
				windowedKey.key(),
				new Window<>(
					windowedKey.window().start() /1000/60/60 *1000*60*60,
					windowedKey.window().start() /1000/60/60 *1000*60*60 + 1000*60*60
					// the above rounds time down to a timestamp divisible by 5 minutes
				)
			), 
			value
		),
		/* your key serde */, 
		/* your value serde */
	)

	.reduce(/*your adder*/, /*your subtractor*/, "store60m");

 

 

 

  • No labels