THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
class Tuple2<T1, T2> { public T1 value1; public T2 value2; Tuple2(T1 v1, T2 v2) { value1 = v1; value2 = v2; } } final KStreamBuilder builder = new KStreamBuilder(); // first step: compute count and sum in a single aggregation step and emit 2-tuples <count,sum> as aggregation result values final KTable<String,Tuple2<Long,Long>> countAndSum = builder.stream("someInputTopic") .groupByKey() .aggregate( new Initializer<Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> apply() { return new Tuple2<>(0L, 0L); } }, new Aggregator<String, String, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> apply(final String key, final Long value, final Tuple2<Long, Long> aggregate) { ++aggregate.value1; aggregate.value2 += value; return aggregate; } }, new Tuple2Serde()); // omitted for brevity // second stopstep: compute average for each 2-tuple final KTable<String,Double> average = countAndSum.mapValues( new ValueMapper<Tuple2<Long, Long>, Double>() { @Override public Double apply(Tuple2<Long, Long> value) { return value.value2 / (double) value.value1; } }); |
...