Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Status

...

Page properties


Discussion thread

...

...

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-27626

...

Release


Motivation

We can introduce richer merge strategies, one of which is already introduced is PartialUpdateMergeFunction, which completes non-NULL fields when merging. We can introduce more powerful merge strategies, such as support for pre-aggregated merges. Currently the pre-aggregation is used by many big data systems, e.g. Apache Doris, Apache Kylin, Druid, to reduce storage cost and accelerate aggregation query. By introducing pre-aggregated merge to Flink Table Store, it can acquire the same benefit.  Aggregate functions which we plan to  implement includes sum sum, max/min, count, replacelast_ifnon_notnull_nullvalue, replace, concatenate, last_value,  listagg, bool_or/bool_and.

Public Interfaces

Basic usage of pre-aggregated merge

...

  1. assign 'aggregation' to 'merge-engine' 
  2. designate aggregate function for each column of table.

For example,

...

-

...

-

...

DDL

...

CREATE

...

TABLE

...

T

...

(

...


 

...

 

...

pk

...

STRING

...

PRIMARY

...

KEY

...

NOT

...

ENFOCED,

...


 

...

 

...

sum_field1

...

BIGINT,

...


    max_field1

...

BIGINT

...


 

...

 

...

)

...


WITH

...

(

...


'merge-engine'

...

=

...

'aggregation',

...


'fields.sum_field1.function'='sum',

...

-

...

-

...

sum

...

up

...

all

...

sum_field1

...

with

...

same

...

pk;

...


'fields.max_field1.function'='max'

...

-

...

-

...

get

...

max

...

value

...

of

...

all

...

max_field1

...

with

...

same

...

pk

...


);

...

-

...

DML

...


INSERT

...

INTO

...

T

...

VALUES

...

('pk1',

...

1,

...

2);

...


INSERT

...

INTO

...

T

...

VALUES

...

('pk1',

...

1,

...

1);

...


– verify
SELECT * FROM T;
=> output 'pk1',

...

2,

...

2

Tips: each column should be designated aggregate functions.

...

The aggregate functions we propose to implement include sum, max/min, count, last_non_null_value, last_value,  listagg, bool_or/bool_and. These functions support different data types.

The sum aggregate function supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE , INTERVAL(INTERVAL YEAR TO MONTH, INTERVAL DAY TO SECOND) data types.

The max/min aggregate function fupports supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, INTERVAL(INTERVAL YEAR TO MONTH, INTERVAL DAY TO SECOND), DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ data types.

The count/ last_non_null_value/last_value aggregate functions support all data types.

The listagg aggregate function supports VARCHAR, STRING  data types.

The bool_and/bool_or aggregate function supports BOOLEAN data type.

Default value of these aggregate functions

sum: default value of corresponding data type

count: 0

max/min/last_non_null_value/last_value: NULL

listagg: ""

bool_and: true

bool_or: false


Changelog support

In most cases, the modification to Table Store is INSERT changes. However, Table Store can also be converted into retract stream which may include retract messages (UPDATE/DELETE changes).

Aforementioned aggregate functions all support INSERT changes. In this FLIP, we plan to make partial aggregate functions support UPDATE and DELETE changes.

Aggregate functions supporting for UPDATE changes: sum, count.

Aggregate functions supporting for DELETE changes: sum, count.

It needs more design to make other aggregate functions support UPDATE/DELETE changes.


Future work
An advanced way of introducing pre-aggregated merge into Flink Table Store is using materialized view to get pre-aggregated merge result from a source table. Then a stream job is started to synchronize data, consume source data, and write incrementally . This data synchronization job has no state. More information is described in JIRA.

Proposed Changes

An ConfigOption<String> type variable named ‘AGGREGATE_FUNCTION’ is defined in CoreOptions.java to retrieve configuration of 'aggregate-function' in WITH clause.

...

A subclass of MergeFunction named AggregateMergeFunction is created in AggregateMergeFunction.java to conduct pre-aggregated merge.

Compatibility, Deprecation, and Migration Plan

This is new feature, no compatibility, deprecation, and migration plan.

Test Plan

Each pre-aggregated merge function will be covered with IT tests.

Rejected Alternatives

None.