Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state"Under Discussion"

Design documentation: https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308

...

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-12786

...

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Note that to achieve the benefits brought by local aggregation, it’s required that the aggregated results can be easily obtained with decomposition and combination. The condition is satisfied by many common aggregating operations, e.g., sum, count and topN. Few other aggregating operations, like cardinality, cannot be easily decomposed and combined, hence will not benefit from the usage of local aggregation.

Public Interfaces

A few APIs to support local aggregation need to be added to DataStream class, list below:

In addition, we introduced a new kind of Keyed state named "Local keyed state" to support our implementation. The "Local keyed state" mainly provide benefit for the implementation of local aggregation. What's more, It also provides more general capabilities for some flexible "local computing." Here, "local computing" not only covers "local aggregation" but also covers more general processing logic processed by "KeyedProcessFunction", "ProcessWindowFunction" and stateful APIs in local. In addition, it supports the implementation of local aggregation based on Window API, because window operator used local keyed state in this scenarios. However, from the API level, the usage of the local keyed state is the same as the generic keyed state, we do not change any interface of keyed state.

Public Interfaces

A few APIs to support local aggregation need to be added to DataStream class, list below:

Code Block
languagejava
LocalKeyedStream<T, Tuple> localKeyBy(int… fields);
LocalKeyedStream<T, Tuple> localKeyBy(Keys<T> keys);
<K> LocalKeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector);
<K> LocalKeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector, TypeInformation<K> keyType);

The usage of the API, please see the examples:

Code Block
languagejava
//local aggregation with existed aggregate APIs
source
	.localKeyBy(0)
	.timeWindow(Time.seconds(5))
	.sum(1)
	.keyBy(0)
	.timeWindow(Time.seconds(5))
	.sum(1)

//local aggregation with user defined "AggregateFunction" to calculate avg
DataStream<Tuple2<String, Long>> source = null;
source
	.localKeyBy(0)
	.timeWindow(Time.seconds(5))
	.aggregate(new AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {},
				new WindowFunction<Tuple2<Long, Long>, Tuple3<String, Long, Long>, String, TimeWindow>() {})
	.keyBy(0)
	.timeWindow(Time.seconds(5))
	.aggregate(new AggregateFunction<Tuple3<String, Long, Long>, Long, Tuple2<String, Long>>() {},
				new WindowFunction<Tuple2<Long, Long>, Tuple2<String, Long>, String, TimeWindow>());

//local aggregation based on ProcessFunction
source
	.localKeyBy(0)
	.process(new ProcessFunction<Tuple2<String,Long>, Tuple2<String, Long>>() {})
	.keyBy(0)
	.timeWindow(Time.seconds(60))
	.aggregate()

//local aggregation based on ProcessWindowFunction
source
	.localKeyBy(0)
	.timeWindow(Time.seconds(60))
	.process(new ProcessWindowFunction<Tuple2<String,Long>, Tuple2<String, Long>, String, TimeWindow>() {})
	.keyBy(0)
	.timeWindow(Time.seconds(60))
	.aggregate()	//.process()

Code Block
languagejava
KeyedStream<T, Tuple> localKeyBy(int… fields);
KeyedStream<T, Tuple> localKeyBy(Keys<T> keys);
<K> KeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector);
<K> KeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector, TypeInformation<K> keyType);



Design

Users can perform local aggregation with local keyed streams. Local keyed streams resemble keyed streams in many respects. They both partition elements according to keys and allow access to states affiliated with keys. But the partitioning in local keyed streams does not require shuffling and is performed locally.

...