Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion Accepted

Discussion thread:  here (<- link to https://mail-archiveslists.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to thread/3s69dhv3rp4s0kysnslqbvyqo3qf7zq5

Vote threadhttps://issueslists.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-292 [1] expands the functionality of CompiledPlan to enable the configuration of operator-level state TTL by modifying the JSON file. During the discussion, it was mentioned that the SQL hint solution could also address some specific simple scenarios, which are very user-friendly, convenient, and unambiguous to use [2]. While in this FLIP, we propose to support this feature.

Specifically, we aim to support using SQL hints to configure state TTL for stream regular join [3] and group aggregate [4], the most typical use cases with unambiguous semantics and hint propagation. We have excluded other use cases as the hint propagation mechanism is more complex, and we might want to tackle them after the current design reaches a consensus.

Public Interfaces

The proposed changes do not introduce interfaces that need to be annotated with Public/PublicEvolving.

Proposed Changes

To enable configuring different state TTLs for stream join or group aggregate, users need to specify a new hint (`STATE_TTL`) in the select clause, similar to the current SQL hints[5].

thread/837r63gdwzoqryvp3gbf67941g706s5d

Jira: 

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33397

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-292 [1] expands the functionality of CompiledPlan to enable the configuration of operator-level state TTL by modifying the JSON file. During the discussion, it was mentioned that the SQL hint solution could also address some specific simple scenarios, which are very user-friendly, convenient, and unambiguous to use [2]. While in this FLIP, we propose to support this feature.

Specifically, we aim to support using SQL hints to configure state TTL for stream regular join [3] and group aggregate [4], the most typical use cases with unambiguous semantics and hint propagation. We have excluded other use cases as the hint propagation mechanism is more complex, and we might want to tackle them after the current design reaches a consensus.

Public Interfaces

The proposed changes do not introduce interfaces that need to be annotated with Public/PublicEvolving.

Proposed Changes

To enable configuring different state TTLs for stream join or group aggregate, users need to specify a new hint (`STATE_TTL`) in the select clause, similar to the current SQL hints[5].

Code Block
languagesql
themeConfluence
query:
 
Code Block
languagesql
themeConfluence
query:
  select /*+ STATE_TTL(kv_option[,kv_option]) */ ...
   
kv_option:
  key_string_literal = value_string_literal
   
key_string_literal:
  table_name | view_name | table_alias | query_block_alias
   
value_string_literal:
  ttl_duration

...

Code Block
languagesql
themeConfluence
-- hint with table name as key
SELECT /*+ STATE_TTL('orders'= '1d', 'customers' = '20d') */ * FROM orders LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;

-- hint with table alias as key
SELECT /*+ STATE_TTL('o'= '12h', 'c' = '3d') */ * FROM orders o LEFT OUTER JOIN customers c
ON o.o_custkey = c.c_custkey;

-- hint with temporary view name as key
CREATE TEMPORARY VIEW left_input AS ...;
CREATE TEMPORARY VIEW right_input AS ...;
SELECT /*+ STATE_TTL('left_input'= '36000s', 'right_input' = '15h') */ * FROM left_input JOIN right_input
ON left_input.join_key = right_input.join_key = right_input.join_key;

Group Aggregate Example

.join_key;

Group Aggregate Example

Code Block
languagesql
themeConfluence
-- hint with table name as key
SELECT /*+ STATE_TTL('orders' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM orders 
GROUP BY o_orderkey;

-- hint with table alias as key
SELECT /*+ STATE_TTL('o' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM orders AS o
GROUP BY o_orderkey;

-- hint with query block alias as key
SELECT /*+ STATE_TTL('tmp' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM (
    SELECT o_orderkey, o_totalprice
    FROM orders
    WHERE o_shippriority = 0
) tmp
GROUP BY o_orderkey;

Stream Regular Join mix Group Aggregate Example

Code Block
languagesql
themeConfluence
-- the group aggregate query block as the input of the regular join
SELECT /*+ STATE_TTL('agg' = '1d', 'customers' = '7d') */ * 
FROM (
    SELECT /*+ STATE_TTL('orders' = '30d') */ o_custkey, COUNT(o_orderkey) AS order_num
    FROM orders
    GROUP BY o_custkey
) agg
LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;

-- the regular join query block as the input of the group aggregate
SELECT /*+ STATE_TTL('tmp' = '1d') */ c_custkey, SUM(o_totalprice) AS revenue
FROM (
    SELECT /*+ STATE_TTL('o' = '12h', 'c' = '3d') */ * 
    FROM orders o LEFT OUTER JOIN customers c
    ON o.o_custkey = c.c_custkey
) tmp;

Exception Handling

When users incorrectly use STATE_TTL hints, there are two possible scenarios:

Scenario 1: The user specifies STATE_TTL which can be applied to the current query block, but with a wrong hint key. For example, they set a non-existent table name/table alias or a query block alias. In this case, an exception should be explicitly thrown. E.g. the following query will throw the org.apache.flink.table.api.ValidationException with a message "The options of following hints cannot match the name of input tables or views:\n`foo` in `state_ttl`". And this check is already done by JoinHintResolver (See JoinHintResolver.java#L193).

Code Block
languagesql
themeConfluence
-- hint with a non-exist table name as key
SELECT /*+ STATE_TTL('ordersfoo' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM orders 
GROUP BY o_orderkey;

-- hint with table alias as key
SELECT /*+ STATE_TTL('o'customers' = '1d20d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM orders AS o
GROUP BY o_orderkey;

-- hint with query block alias as key
SELECT /*+ STATE_TTL('tmp' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM (
    SELECT o_orderkey, o_totalprice
    FROM orders
    WHERE o_shippriority = 0
) tmp
GROUP BY o_orderkey;

Stream Regular Join mix Group Aggregate Example

 * FROM orders LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;

We can do a similar check for the aggregate node when there is an invalid hint key (this is newly introduced since there has been no aggregate hint check before).


Scenario 2: The user includes the STATE_TTL hint in a query block that does not support it. In this case, the hint will be ignored without throwing an exception. It's intended to do so to remain consistent with the current hint behavior.

E.g. SHUFFLE_MERGE is a batch join hint that only supports join with equivalence join condition (See hints/#shuffle_merge), but we can still get a valid plan and execution result under streaming mode, and with the non-equi condition, the hint is quietly ignored.

Code Block
languagesql
themeConfluence
-- the group aggregate query block as the input of the regular join
SELECT /*+ STATE_TTL('agg' = '1d', 'customers' = '7d') */ * 
FROM (
    -- explain the query under streaming mode
EXPLAIN SELECT /*+ STATESHUFFLE_TTLMERGE('orders' = '30d'o') */ o_custkey, COUNT(o_orderkey) AS order_num
    FROM orders
    GROUP BY o_custkey
) agg
LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;

-- the regular join query block as the input of the group aggregate
SELECT /*+ STATE_TTL('tmp' = '1d') */ c_custkey, SUM(o_totalprice) AS revenue
FROM (
    SELECT /*+ STATE_TTL('o' = '12h', 'c' = '3d') */ * 
    FROM orders o LEFT OUTER JOIN customers c
    ON o.o_custkey = c.c_custkey
) tmp;

Note

* from
orders o LEFT JOIN
lineitem l
ON o.o_totalprice > l.l_extendedprice;

== Optimized Physical Plan ==
Join(joinType=[LeftOuterJoin], where=[>(o_totalprice, l_extendedprice)], select=[o_orderkey, o_custkey, o_status, o_totalprice, l_linenumber, l_orderkey, l_partkey, l_extendedprice], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[SHUFFLE_MERGE options:[LEFT]]]])
:- Exchange(distribution=[single])
:  +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status, o_totalprice])
+- Exchange(distribution=[single])
   +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])

The reason is explained in the FLIP-229: Introduces Join Hint for Flink SQL Batch Job, quoted as follows.

> 3. Check when optimizing

> In the optimization stage of the optimizer, it is judged that this Join Hint can be applied. If it cannot be applied, the behavior is to print a warning log instead of throwing an exception directlyIf users mistakenly use the STATE_TTL hint, it will be silently ignored, and no exception will be thrown. This is done to maintain consistency with the current behavior of hints.


Hint Strategy

Code Block
languagejava
themeConfluence
titleFlinkHintStrategies
HintStrategyTable.builder()
    .hintStrategy("STATE_TTL",
            HintStrategy.builder(
                    HintPredicates.or(
                            HintPredicates.JOIN, HintPredicates.AGGREGATE))
                .optionChecker(STATE_TTL_OPTION_CHECKER)
                .build())
    .build();

...

Code Block
languagejava
themeConfluence
titleStateMetadata
    public static List<StateMetadata> getMultiInputOperatorDefaultMeta(
+           Map<Integer, Long> stateTtlFromHint,
            ReadableConfig tableConfig,
            String... stateNameList) {
        Duration ttlFromTableConf = tableConfig.get(ExecutionConfigOptions.IDLE_STATE_RETENTION);
        List<StateMetadata> stateMetadataList = new ArrayList<>(stateNameList.length);
        for (int i = 0; i < stateNameList.length; i++) {
+           Duration stateTtl =
+                   stateTtlFromHint.containsKey(i)
+                           ? Duration.ofMillis(stateTtlFromHint.get(i))
+                           : ttlFromTableConf;
            stateMetadataList.add(new StateMetadata(i, stateTtl, stateNameList[i]));
        }
        return stateMetadataList;
    }

Affected StreamPhysicalRels

  • StreamPhysicalJoin
  • StreamPhysicalGroupAggregate
  • StreamPhysicalGlobalGroupAggregate (to support two-phase optimization)
  • StreamPhysicalIncrementalGroupAggregate (to support partial-global-agg and final-local-agg combine optimization)

Parameter Granularity and Priority

...