Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
KeyedStream<T, Tuple> localKeyBy(int… fields);
KeyedStream<T, Tuple> localKeyBy(Keys<T> keys);
<K> KeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector);
<K> KeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector, TypeInformation<K> keyType);

The usage of the API, please see the examples:

Code Block
languagejava
//local aggregation with existed aggregate APIs
source
	.localKeyBy(0)
	.timeWindow(Time.seconds(5))
	.sum(1)
	.keyBy(0)
	.timeWindow(Time.seconds(5))
	.sum(1)

//local aggregation with user defined "AggregateFunction" to calculate avg
DataStream<Tuple2<String, Long>> source = null;
source
	.localKeyBy(0)
	.timeWindow(Time.seconds(5))
	.aggregate(new AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {},
				new WindowFunction<Tuple2<Long, Long>, Tuple3<String, Long, Long>, String, TimeWindow>() {})
	.keyBy(0)
	.timeWindow(Time.seconds(5))
	.aggregate(new AggregateFunction<Tuple3<String, Long, Long>, Long, Tuple2<String, Long>>() {},
				new WindowFunction<Tuple2<Long, Long>, Tuple2<String, Long>, String, TimeWindow>());

//local aggregation based on ProcessFunction
source
	.localKeyBy(0)
	.process(new ProcessFunction<Tuple2<String,Long>, Tuple2<String, Long>>() {})
	.keyBy(0)
	.timeWindow(Time.seconds(60))
	.aggregate()

//local aggregation based on ProcessWindowFunction
source
	.localKeyBy(0)
	.timeWindow(Time.seconds(60))
	.process(new ProcessWindowFunction<Tuple2<String,Long>, Tuple2<String, Long>, String, TimeWindow>() {})
	.keyBy(0)
	.timeWindow(Time.seconds(60))
	.aggregate()	//.process()



Design

Users can perform local aggregation with local keyed streams. Local keyed streams resemble keyed streams in many respects. They both partition elements according to keys and allow access to states affiliated with keys. But the partitioning in local keyed streams does not require shuffling and is performed locally.

...