Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion Accepted

Discussion thread:  here (<- link to https://mail-archiveslists.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to thread/3s69dhv3rp4s0kysnslqbvyqo3qf7zq5

Vote threadhttps://issueslists.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-292 [1] expands the functionality of CompiledPlan to enable the configuration of operator-level state TTL by modifying the JSON file. During the discussion, it was mentioned that the SQL hint solution could also address some specific simple scenarios, which are very user-friendly, convenient, and unambiguous [2]. While in this FLIP, we propose to support this feature.

Specifically, we aim to support using SQL hints to configure state TTL for stream regular join [3] and group aggregate [4], which are the most typical use cases with unambiguous semantics and hint propagation. We have excluded other use cases as the hint propagation mechanism is more complex, and we might want to tackle them after the current design reaches a consensus.

Public Interfaces

The proposed changes do not introduce interfaces that need to be annotated with Public/PublicEvolving.

Proposed Changes

To enable configuring different state TTLs for stream join or group aggregate, users need to specify a new hint (`STATE_TTL`) in the select clause, similar to the current SQL hints[5].

thread/837r63gdwzoqryvp3gbf67941g706s5d

Jira: 

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33397

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-292 [1] expands the functionality of CompiledPlan to enable the configuration of operator-level state TTL by modifying the JSON file. During the discussion, it was mentioned that the SQL hint solution could also address some specific simple scenarios, which are very user-friendly, convenient, and unambiguous to use [2]. While in this FLIP, we propose to support this feature.

Specifically, we aim to support using SQL hints to configure state TTL for stream regular join [3] and group aggregate [4], the most typical use cases with unambiguous semantics and hint propagation. We have excluded other use cases as the hint propagation mechanism is more complex, and we might want to tackle them after the current design reaches a consensus.

Public Interfaces

The proposed changes do not introduce interfaces that need to be annotated with Public/PublicEvolving.

Proposed Changes

To enable configuring different state TTLs for stream join or group aggregate, users need to specify a new hint (`STATE_TTL`) in the select clause, similar to the current SQL hints[5].

Code Block
languagesql
themeConfluence
query:
 
Code Block
languagesql
themeConfluence
query:
  select /*+ STATE_TTL(kv_option[,kv_option]) */ ...
   
kv_option:
  key_string_literal = value_string_literal
   
key_string_literal:
  table_name | view_name | table_alias | query_block_alias
   
value_string_literal:
  ttl_duration

...

Code Block
languagesql
themeConfluence
-- hint with table name as key
SELECT /*+ STATE_TTL('orders' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM orders 
GROUP BY o_orderkey;

-- hint with table alias as key
SELECT /*+ STATE_TTL('o' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM orders AS o
GROUP BY o_orderkey;

-- hint with query block alias as key
SELECT /*+ STATE_TTL('tmp' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM (
    SELECT o_orderkey, o_totalprice
    FROM orders
    WHERE o_shippriority = 0
) tmp
GROUP BY o_orderkey;;

Stream Regular Join mix Group Aggregate Example

Code Block
languagesql
themeConfluence
-- the group aggregate query block as the input of the regular join
SELECT /*+ STATE_TTL('agg' = '1d', 'customers' = '7d') */ * 
FROM (
    SELECT /*+ STATE_TTL('orders' = '30d') */ o_custkey, COUNT(o_orderkey) AS order_num
    FROM orders
    GROUP BY o_custkey
) agg
LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;

-- the regular join query block as the input of the group aggregate
SELECT /*+ STATE_TTL('tmp' = '1d') */ c_custkey, SUM(o_totalprice) AS revenue
FROM (
    SELECT /*+ STATE_TTL('o' = '12h', 'c' = '3d') */ * 
    FROM orders o LEFT OUTER JOIN customers c
    ON o.o_custkey = c.c_custkey
) tmp;

Exception Handling

When users incorrectly use STATE_TTL hints, there are two possible scenarios:

Scenario 1: The user specifies STATE_TTL which can be applied to the current query block, but with a wrong hint key. For example, they set a non-existent table name/table alias or a query block alias. In this case, an exception should be explicitly thrown. E.g. the following query will throw the org.apache.flink.table.api.ValidationException with a message "The options of following hints cannot match the name of input tables or views:\n`foo` in `state_ttl`". And this check is already done by JoinHintResolver (See JoinHintResolver.java#L193).

Code Block
languagesql
themeConfluence
-- hint with a non-exist table name as key
SELECT /*+ STATE_TTL('foo'= '1d', 'customers' = '20d') */ * FROM orders LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;

We can do a similar check for the aggregate node when there is an invalid hint key (this is newly introduced since there has been no aggregate hint check before).


Scenario 2: The user includes the STATE_TTL hint in a query block that does not support it. In this case, the hint will be ignored without throwing an exception. It's intended to do so to remain consistent with the current hint behavior.

E.g. SHUFFLE_MERGE is a batch join hint that only supports join with equivalence join condition (See hints/#shuffle_merge), but we can still get a valid plan and execution result under streaming mode, and with the non-equi condition, the hint is quietly ignored.Stream Regular Join mix Group Aggregate Example

Code Block
languagesql
themeConfluence
-- explain the group aggregate query blockunder as the input of the regular join
SELECT /*+ STATE_TTL('agg' = '1d', 'customers' = '7d') */ * 
FROM (
   streaming mode
EXPLAIN SELECT /*+ STATESHUFFLE_TTLMERGE('orders' = '30do') */ o_custkey, COUNT(o_orderkey) AS order_num
    FROM orders
    GROUP BY o_custkey
) agg
LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;

-- the regular join query block as the input of the group aggregate
SELECT /*+ STATE_TTL('tmp' = '1d') */ c_custkey, SUM(o_totalprice) AS revenue
FROM (
    SELECT /*+ STATE_TTL('o' = '12h', 'c' = '3d') */ * 
    FROM orders o LEFT OUTER JOIN customers c
    ON o.o_custkey = c.c_custkey
) tmp;

Note

* from
orders o LEFT JOIN
lineitem l
ON o.o_totalprice > l.l_extendedprice;

== Optimized Physical Plan ==
Join(joinType=[LeftOuterJoin], where=[>(o_totalprice, l_extendedprice)], select=[o_orderkey, o_custkey, o_status, o_totalprice, l_linenumber, l_orderkey, l_partkey, l_extendedprice], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[SHUFFLE_MERGE options:[LEFT]]]])
:- Exchange(distribution=[single])
:  +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status, o_totalprice])
+- Exchange(distribution=[single])
   +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])

The reason is explained in the FLIP-229: Introduces Join Hint for Flink SQL Batch Job, quoted as follows.

> 3. Check when optimizing

> In the optimization stage of the optimizer, it is judged that this Join Hint can be applied. If it cannot be applied, the behavior is to print a warning log instead of throwing an exception directlyIf users mistakenly use the STATE_TTL hint, it will be silently ignored, and no exception will be thrown. This is done to maintain consistency with the current behavior of hints.


Hint Strategy

Code Block
languagejava
themeConfluence
titleFlinkHintStrategies
HintStrategyTable.builder()
    .hintStrategy("STATE_TTL",
            HintStrategy.builder(
                    HintPredicates.or(
                            HintPredicates.JOIN, HintPredicates.AGGREGATE))
                .optionChecker(STATE_TTL_OPTION_CHECKER)
                .build())
    .build();

...

Currently, SqlToRelConverter will attach an alias hint at the root of the query block if the join hint is detected (see SqlToRelConverter.java#L2289), and then ClearJoinHintWithInvalidPropagationShuttle will ensure no join hint is propagated to the inner query block, clear the invalid hints that propagated from the outer query block by checking the hint strategy , and comparing the relative inherit path compared to the query block's alias hint. This ensures that hints from other nodes will not be attached to the current join node.

The STATE_TTL hint will apply a similar approach, which means the hint only affects the current query block and does not affect the state TTL in the subquery and view, which aligns with the current hint behavior. Likewise,  `ClearJoinHintWithInvalidPropagationShuttle` will override the visit method to remove the propagated hints for FlinkLogicalAggregate.

...

Code Block
languagejava
themeConfluence
titleStateMetadata
    public static List<StateMetadata> getMultiInputOperatorDefaultMeta(
+           Map<Integer, Long> stateTtlFromHint,
            ReadableConfig tableConfig,
            String... stateNameList) {
        Duration ttlFromTableConf = tableConfig.get(ExecutionConfigOptions.IDLE_STATE_RETENTION);
        List<StateMetadata> stateMetadataList = new ArrayList<>(stateNameList.length);
        for (int i = 0; i < stateNameList.length; i++) {
+           Duration stateTtl =
+                   stateTtlFromHint.containsKey(i)
+                           ? Duration.ofMillis(stateTtlFromHint.get(i))
+                           : ttlFromTableConf;
            stateMetadataList.add(new StateMetadata(i, stateTtl, stateNameList[i]));
        }
        return stateMetadataList;
    }

Affected StreamPhysicalRels

  • StreamPhysicalJoin
  • StreamPhysicalGroupAggregate
  • StreamPhysicalGlobalGroupAggregate (to support two-phase optimization)
  • StreamPhysicalIncrementalGroupAggregate (to support partial-global-agg and final-local-agg combine optimization)

Parameter Granularity and Priority

...