Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleNon-Windowed Average
linenumberstrue
class Tuple2<T1, T2> {
  public T1 value1;
  public T2 value2;
 
  Tuple2(T1 v1, T2 v2) {
    value1 = v1;
    value2 = v2;
  }
}

final KStreamBuilder builder = new KStreamBuilder();
 
// first step: compute count and sum in a single aggregation step and emit 2-tuples <count,sum> as aggregation result values
final KTable<String,Tuple2<Long,Long>> countAndSum = builder.stream("someInputTopic")
                                                            .groupByKey()
                                                            .aggregate(
                                                                new Initializer<Tuple2<Long, Long>>() {
                                                                    @Override
                                                                    public Tuple2<Long, Long> apply() {
                                                                        return new Tuple2<>(0L, 0L);
                                                                    }
                                                                },
                                                                new Aggregator<String, String, Tuple2<Long, Long>>() {
                                                                    @Override
                                                                    public Tuple2<Long, Long> apply(final String key, final Long value, final Tuple2<Long, Long> aggregate) {
                                                                            ++aggregate.value1;
                                                                            aggregate.value2 += value;
                                                                            return aggregate;
                                                                    }
                                                                },
                                                                new Tuple2Serde()); // omitted for brevity
 
// second stopstep: compute average for each 2-tuple
final KTable<String,Double> average = countAndSum.mapValues(
                                                     new ValueMapper<Tuple2<Long, Long>, Double>() {
                                                         @Override
                                                         public Double apply(Tuple2<Long, Long> value) {
                                                             return value.value2 / (double) value.value1;
                                                         }
                                                     });

...