THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
KeyedStream<T, Tuple> localKeyBy(int… fields); KeyedStream<T, Tuple> localKeyBy(Keys<T> keys); <K> KeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector); <K> KeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector, TypeInformation<K> keyType); |
The usage of the API, please see the examples:
Code Block | ||
---|---|---|
| ||
//local aggregation with existed aggregate APIs
source
.localKeyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
//local aggregation with user defined "AggregateFunction" to calculate avg
DataStream<Tuple2<String, Long>> source = null;
source
.localKeyBy(0)
.timeWindow(Time.seconds(5))
.aggregate(new AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {},
new WindowFunction<Tuple2<Long, Long>, Tuple3<String, Long, Long>, String, TimeWindow>() {})
.keyBy(0)
.timeWindow(Time.seconds(5))
.aggregate(new AggregateFunction<Tuple3<String, Long, Long>, Long, Tuple2<String, Long>>() {},
new WindowFunction<Tuple2<Long, Long>, Tuple2<String, Long>, String, TimeWindow>());
//local aggregation based on ProcessFunction
source
.localKeyBy(0)
.process(new ProcessFunction<Tuple2<String,Long>, Tuple2<String, Long>>() {})
.keyBy(0)
.timeWindow(Time.seconds(60))
.aggregate()
//local aggregation based on ProcessWindowFunction
source
.localKeyBy(0)
.timeWindow(Time.seconds(60))
.process(new ProcessWindowFunction<Tuple2<String,Long>, Tuple2<String, Long>, String, TimeWindow>() {})
.keyBy(0)
.timeWindow(Time.seconds(60))
.aggregate() //.process()
|
Design
Users can perform local aggregation with local keyed streams. Local keyed streams resemble keyed streams in many respects. They both partition elements according to keys and allow access to states affiliated with keys. But the partitioning in local keyed streams does not require shuffling and is performed locally.
...