You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 11 Next »


Status

Current state: Under Discussion

Discussion thread:  https://mail-archives.apache.org/mod_mbox/flink-dev/

JIRA: Unable to render Jira issues macro, execution error.

Released: <Flink Version>

Motivation

We can introduce richer merge strategies, one of which is already introduced is PartialUpdateMergeFunction, which completes non-NULL fields when merging. We can introduce more powerful merge strategies, such as support for pre-aggregated merges. Currently the pre-aggregation is used by many big data systems, e.g. Apache Doris, Apache Kylin, Druid, to reduce storage cost and accelerate aggregation query. By introducing pre-aggregated merge to Flink table store, it can acquire the same benefit.  Aggregate functions which we plan to  implement includes sum, max/min, count, replace_if_not_null, replace, concatenate, or/and.

Public Interfaces

Basic usage of pre-aggregated merge

To use pre-aggregated merge in Flink Table Store, two kind of configurations should be added to WITH clause when creating table.

  1. assign 'aggregation' to 'merge-engine' 
  2. designate aggregate function for each column of table.

For example,

--DDL
CREATE TABLE T (
    pk STRING PRIMARY KEY NOT ENFOCED,
    sum_field1 BIGINT,
    max_field1 BIGINT,
    default_field BIGINT )
WITH (
'merge-engine' = 'aggregation',
'aggregate-function' = '{
sum_field1:sum,
max_field2:max
}'   -- sum up all sum_field1 with same pk; get max value of all max_field1 with same pk
);

– DML
INSERT INTO T VALUES ('pk1', 1, 1, 1);
INSERT INTO T VALUES ('pk1', 1, 1, NULL);
– verify
SELECT * FROM T;
=> output 'pk1', 2, 2, NULL

Tips: Columns which do not have designated aggregate functions using newest value to overwrite old value.


Supported aggregate functions

The aggregate functions we propose to implement include sum, max/min, count, replace_if_not_null, replace,  concatenate, or/and. These functions support different data types.

The sum aggregate function supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, INTERVAL(INTERVAL YEAR TO MONTH, INTERVAL DAY TO SECOND) data types.

The max/min aggregate function fupports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, INTERVAL(INTERVAL YEAR TO MONTH, INTERVAL DAY TO SECOND), DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ data types.

The count/replace_if_not_null/replace aggregate functions support all data types.

The concatenate aggregate function supports VARCHAR, STRING  data types.

The and/or aggregate function supports BOOLEAN data type.


Future Work
An advanced way of introducing pre-aggregated merge into Flink table store is using materialized view to get pre-aggregated merge result from a source table. Then a stream job is started to synchronize data, consume source data, and write incrementally . This data synchronization job has no state. More information is described in JIRA.

Proposed Changes

An ConfigOption<String> type variable named ‘AGGREGATE_FUNCTION’ is defined in CoreOptions.java to retrieve configuration of 'aggregate-function' in WITH clause.

Adding one more value named 'PRE_AGGREGATE'  to enum MergeEngine type in CoreOptions.java. It acts as one type of the merge-engines supported by Flink table store.

In the constructor of ChangelogWithKeyFileStoreTable, using 'PRE_AGGREGATE' as one more case in switch-case to initialize merge-engine.

A subclass of MergeFunction named AggregateMergeFunction is created in AggregateMergeFunction.java to conduct pre-aggregated merge.

Compatibility, Deprecation, and Migration Plan

This is new feature, no compatibility, deprecation, and migration plan.

Test Plan

Each pre-aggregated merge function will be covered with IT tests.

Rejected Alternatives

None.

  • No labels