Status
...
Page properties | |
---|---|
|
...
...
...
|
...
|
...
|
Motivation
We can introduce richer merge strategies, one of which is already introduced is PartialUpdateMergeFunction, which completes non-NULL fields when merging. We can introduce more powerful merge strategies, such as support for pre-aggregated merges. Currently the pre-aggregation is used by many big data systems, e.g. Apache Doris, Apache Kylin, Druid, to reduce storage cost and accelerate aggregation query. By introducing pre-aggregated merge to Flink Table Store, it can acquire the same benefit. Aggregate functions which we plan to implement includes sum sum, max/min, count, replacelast_ifnon_notnull_nullvalue, replace, concatenate, last_value, listagg, bool_or/bool_and.
Public Interfaces
Basic usage of pre-aggregated merge
...
- assign 'aggregation' to 'merge-engine'
- designate aggregate function for each column of table.
For example,
...
-
...
-
...
DDL
...
CREATE
...
TABLE
...
T
...
(
...
...
...
pk
...
STRING
...
PRIMARY
...
KEY
...
NOT
...
ENFOCED,
...
...
...
sum_field1
...
BIGINT,
...
max_field1
...
BIGINT
...
...
...
)
...
WITH
...
(
...
'merge-engine'
...
=
...
'aggregation',
...
'fields.sum_field1.function'='sum',
...
-
...
-
...
sum
...
up
...
all
...
sum_field1
...
with
...
same
...
pk;
...
'fields.max_field1.function'='max'
...
-
...
-
...
get
...
max
...
value
...
of
...
all
...
max_field1
...
with
...
same
...
pk
...
);
...
-
...
– DML
...
INSERT
...
INTO
...
T
...
VALUES
...
('pk1',
...
1,
...
2);
...
INSERT
...
INTO
...
T
...
VALUES
...
('pk1',
...
1,
...
1);
...
– verify
SELECT * FROM T;
=> output 'pk1',
...
2,
...
2
Tips: each column should be designated aggregate functions.
...
The aggregate functions we propose to implement include sum, max/min, count, last_non_null_value, last_value, listagg, bool_or/bool_and. These functions support different data types.
The sum aggregate function supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE , INTERVAL(INTERVAL YEAR TO MONTH, INTERVAL DAY TO SECOND) data types.
The max/min aggregate function fupports supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, INTERVAL(INTERVAL YEAR TO MONTH, INTERVAL DAY TO SECOND), DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ data types.
The count/ last_non_null_value/last_value aggregate functions support all data types.
The listagg aggregate function supports VARCHAR, STRING data types.
The bool_and/bool_or aggregate function supports BOOLEAN data type.
Default value of these aggregate functions
sum: default value of corresponding data type
count: 0
max/min/last_non_null_value/last_value: NULL
listagg: ""
bool_and: true
bool_or: false
Changelog support
In most cases, the modification to Table Store is INSERT changes. However, Table Store can also be converted into retract stream which may include retract messages (UPDATE/DELETE changes).
Aforementioned aggregate functions all support INSERT changes. In this FLIP, we plan to make partial aggregate functions support UPDATE and DELETE changes.
Aggregate functions supporting for UPDATE changes: sum, count.
Aggregate functions supporting for DELETE changes: sum, count.
It needs more design to make other aggregate functions support UPDATE/DELETE changes.
Future work
An advanced way of introducing pre-aggregated merge into Flink Table Store is using materialized view to get pre-aggregated merge result from a source table. Then a stream job is started to synchronize data, consume source data, and write incrementally . This data synchronization job has no state. More information is described in JIRA.
Proposed Changes
An ConfigOption<String> type variable named ‘AGGREGATE_FUNCTION’ is defined in CoreOptions.java to retrieve configuration of 'aggregate-function' in WITH clause.
...
A subclass of MergeFunction named AggregateMergeFunction is created in AggregateMergeFunction.java to conduct pre-aggregated merge.
Compatibility, Deprecation, and Migration Plan
This is new feature, no compatibility, deprecation, and migration plan.
Test Plan
Each pre-aggregated merge function will be covered with IT tests.
Rejected Alternatives
None.