Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state"Under Discussion"

Design documentation: https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing

Discussion thread: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308

...

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-12786

...

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Note that to achieve the benefits brought by local aggregation, it’s required that the aggregated results can be easily obtained with decomposition and combination. The condition is satisfied by many common aggregating operations, e.g., sum, count and topN. Few other aggregating operations, like cardinality, cannot be easily decomposed and combined, hence will not benefit from the usage of local aggregationtopN. Few other aggregating operations, like cardinality, cannot be easily decomposed and combined, hence will not benefit from the usage of local aggregation.

In addition, we introduced a new kind of Keyed state named "Local keyed state" to support our implementation. The "Local keyed state" mainly provide benefit for the implementation of local aggregation. What's more, It also provides more general capabilities for some flexible "local computing." Here, "local computing" not only covers "local aggregation" but also covers more general processing logic processed by "KeyedProcessFunction", "ProcessWindowFunction" and stateful APIs in local. In addition, it supports the implementation of local aggregation based on Window API, because window operator used local keyed state in this scenarios. However, from the API level, the usage of the local keyed state is the same as the generic keyed state, we do not change any interface of keyed state.

Public Interfaces

A few APIs to support local aggregation need to be added to DataStream class, list below:

Code Block
languagejava
KeyedStream<TLocalKeyedStream<T, Tuple> localKeyBy(int… fields);
KeyedStream<TLocalKeyedStream<T, Tuple> localKeyBy(Keys<T> keys);
<K> KeyedStream<TLocalKeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector);
<K> KeyedStream<TLocalKeyedStream<T, K> localKeyBy(KeySelector<T, K> keySelector, TypeInformation<K> keyType);

...