You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: Under Discussion

Discussion thread: https://lists.apache.org/thread/3s69dhv3rp4s0kysnslqbvyqo3qf7zq5

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-292 [1] expands the functionality of CompiledPlan to enable the configuration of operator-level state TTL by modifying the JSON file. During the discussion, it was mentioned that the SQL hint solution could also address some specific simple scenarios, which are very user-friendly, convenient, and unambiguous to use [2]. While in this FLIP, we propose to support this feature.

Specifically, we aim to support using SQL hints to configure state TTL for stream regular join [3] and group aggregate [4], the most typical use cases with unambiguous semantics and hint propagation. We have excluded other use cases as the hint propagation mechanism is more complex, and we might want to tackle them after the current design reaches a consensus.

Public Interfaces

The proposed changes do not introduce interfaces that need to be annotated with Public/PublicEvolving.

Proposed Changes

To enable configuring different state TTLs for stream join or group aggregate, users need to specify a new hint (`STATE_TTL`) in the select clause, similar to the current SQL hints[5].

query:
  select /*+ STATE_TTL(kv_option[,kv_option]) */ ...
   
kv_option:
  key_string_literal = value_string_literal
   
key_string_literal:
  table_name | view_name | table_alias | query_block_alias
   
value_string_literal:
  ttl_duration

Stream Regular Join Example

-- hint with table name as key
SELECT /*+ STATE_TTL('orders'= '1d', 'customers' = '20d') */ * FROM orders LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;

-- hint with table alias as key
SELECT /*+ STATE_TTL('o'= '12h', 'c' = '3d') */ * FROM orders o LEFT OUTER JOIN customers c
ON o.o_custkey = c.c_custkey;

-- hint with temporary view name as key
CREATE TEMPORARY VIEW left_input AS ...;
CREATE TEMPORARY VIEW right_input AS ...;
SELECT /*+ STATE_TTL('left_input'= '36000s', 'right_input' = '15h') */ * FROM left_input JOIN right_input
ON left_input.join_key = right_input.join_key;

Group Aggregate Example

-- hint with table name as key
SELECT /*+ STATE_TTL('orders' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM orders 
GROUP BY o_orderkey;

-- hint with table alias as key
SELECT /*+ STATE_TTL('o' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM orders AS o
GROUP BY o_orderkey;

-- hint with query block alias as key
SELECT /*+ STATE_TTL('tmp' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue
FROM (
    SELECT o_orderkey, o_totalprice
    FROM orders
    WHERE o_shippriority = 0
) tmp
GROUP BY o_orderkey;

Stream Regular Join mix Group Aggregate Example

-- the group aggregate query block as the input of the regular join
SELECT /*+ STATE_TTL('agg' = '1d', 'customers' = '7d') */ * 
FROM (
    SELECT /*+ STATE_TTL('orders' = '30d') */ o_custkey, COUNT(o_orderkey) AS order_num
    FROM orders
    GROUP BY o_custkey
) agg
LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;

-- the regular join query block as the input of the group aggregate
SELECT /*+ STATE_TTL('tmp' = '1d') */ c_custkey, SUM(o_totalprice) AS revenue
FROM (
    SELECT /*+ STATE_TTL('o' = '12h', 'c' = '3d') */ * 
    FROM orders o LEFT OUTER JOIN customers c
    ON o.o_custkey = c.c_custkey
) tmp;

Exception Handling

When users incorrectly use STATE_TTL hints, there are two possible scenarios:

Scenario 1: The user specifies STATE_TTL which can be applied to the current query block, but with a wrong hint key. For example, they set a non-existent table name/table alias or a query block alias. In this case, an exception should be explicitly thrown. E.g. the following query will throw the org.apache.flink.table.api.ValidationException with a message "The options of following hints cannot match the name of input tables or views:\n`foo` in `state_ttl`". And this check is already done by JoinHintResolver (See JoinHintResolver.java#L193).

-- hint with a non-exist table name as key
SELECT /*+ STATE_TTL('foo'= '1d', 'customers' = '20d') */ * FROM orders LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;

We can do a similar check for the aggregate node when there is an invalid hint key (this is newly introduced since there has been no aggregate hint check before).


Scenario 2: The user includes the STATE_TTL hint in a query block that does not support it. In this case, the hint will be ignored without throwing an exception. It's intended to do so to remain consistent with the current hint behavior.

E.g. SHUFFLE_MERGE is a batch join hint that only supports join with equivalence join condition (See hints/#shuffle_merge), but we can still get a valid plan and execution result under streaming mode, and with the non-equi condition, the hint is quietly ignored.

-- explain the query under streaming mode
EXPLAIN SELECT /*+ SHUFFLE_MERGE('o') */* from
orders o LEFT JOIN
lineitem l
ON o.o_totalprice > l.l_extendedprice;

== Optimized Physical Plan ==
Join(joinType=[LeftOuterJoin], where=[>(o_totalprice, l_extendedprice)], select=[o_orderkey, o_custkey, o_status, o_totalprice, l_linenumber, l_orderkey, l_partkey, l_extendedprice], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[SHUFFLE_MERGE options:[LEFT]]]])
:- Exchange(distribution=[single])
:  +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status, o_totalprice])
+- Exchange(distribution=[single])
   +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])

The reason is explained in the FLIP-229: Introduces Join Hint for Flink SQL Batch Job, quoted as follows.

> 3. Check when optimizing

> In the optimization stage of the optimizer, it is judged that this Join Hint can be applied. If it cannot be applied, the behavior is to print a warning log instead of throwing an exception directly.


Hint Strategy

FlinkHintStrategies
HintStrategyTable.builder()
    .hintStrategy("STATE_TTL",
            HintStrategy.builder(
                    HintPredicates.or(
                            HintPredicates.JOIN, HintPredicates.AGGREGATE))
                .optionChecker(STATE_TTL_OPTION_CHECKER)
                .build())
    .build();

The STATE_TTL_OPTION_CHECKER will ensure the hint options are non-empty and the value is a valid duration.

Hint Propagation in Optimizer

Currently, SqlToRelConverter will attach an alias hint at the root of the query block if the join hint is detected (see SqlToRelConverter.java#L2289), and then ClearJoinHintWithInvalidPropagationShuttle will clear the invalid hints that propagated from the outer query block by checking the hint strategy and comparing the relative inherit path to the query block's alias hint. This ensures that hints from other nodes will not be attached to the current join node.

The STATE_TTL hint will apply a similar approach, which means the hint only affects the current query block and does not affect the state TTL in the subquery, which aligns with the current hint behavior. Likewise,  `ClearJoinHintWithInvalidPropagationShuttle` will override the visit method to remove the propagated hints for FlinkLogicalAggregate.

ClearJoinHintWithInvalidPropagationShuttle
    @Override
    public RelNode visit(LogicalAggregate aggregate) {
        List<RelHint> hints = ((Hintable) aggregate).getHints();
        Optional<RelHint> firstAliasHint =
                hints.stream()
                        .filter(hint -> FlinkHints.HINT_ALIAS.equals(hint.hintName))
                        .findFirst();
        if (!firstAliasHint.isPresent()) {
            return super.visit(aggregate);
        }
        // compare the inheritPath of alias hint and other hints to find propogated hints from the outer query block
        List<RelHint> joinHintsFromOuterQueryBlock = ...;
        if (joinHintsFromOuterQueryBlock.isEmpty()) {
            return super.visit(aggregate);
        }
        RelNode newAggregate = aggregate;
        ClearOuterJoinHintShuttle clearOuterJoinHintShuttle;

        for (RelHint outerJoinHint : joinHintsFromOuterQueryBlock) {
            clearOuterJoinHintShuttle = new ClearOuterJoinHintShuttle(outerJoinHint);
            newAggregate = newAggregate.accept(clearOuterJoinHintShuttle);
        }
        return super.visit(newAggregate);
    }

Expected Physical Plan

We elaborate on the details as follows.

1. Cascade Joins

For cascade joins like SELECT /*+ STATE_TTL('A' = '...', 'B' = '...', 'C' = '...')*/ * FROM A JOIN B ON ... JOIN C ON ... ,  the specified state TTLs will be interpreted as the left and right state TTL for the first join operator and the right state TTL for the second join operator (from a bottom-up order). The left state TTL for the second join operator will be retrieved from the configuration table.exec.state.ttl

EXPALIN SELECT /*+ STATE_TTL('o' = '3d', 'l' = '1d', 'c' = '10d') */* FROM
orders o LEFT OUTER JOIN
lineitem l
ON o.o_orderkey = l.l_orderkey
LEFT OUTER JOIN customers c
ON o.o_custkey = c.c_custkey;

== Optimized Physical Plan ==
Join(joinType=[LeftOuterJoin], where=[=(o_custkey, c_custkey)], select=[o_orderkey, o_custkey, o_status, l_linenumber, l_orderkey, l_partkey, l_extendedprice, c_custkey, c_address], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{RIGHT=864000000}]]])
:- Exchange(distribution=[hash[o_custkey]])
:  +- Join(joinType=[LeftOuterJoin], where=[=(o_orderkey, l_orderkey)], select=[o_orderkey, o_custkey, o_status, l_linenumber, l_orderkey, l_partkey, l_extendedprice], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{LEFT=259200000, RIGHT=86400000}]]])
:     :- Exchange(distribution=[hash[o_orderkey]])
:     :  +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status])
:     +- Exchange(distribution=[hash[l_orderkey]])
:        +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])
+- Exchange(distribution=[hash[c_custkey]])
   +- TableSourceScan(table=[[default_catalog, default_database, customers]], fields=[c_custkey, c_address])  

If users need to set a specific value, they can split the cascade join clause into query blocks like

CREATE TEMPORARY VIEW V AS SELECT /*+ STATE_TTL('A' = '${ttl_A}', 'B' = '${ttl_B}')*/ * FROM A JOIN B ON...;
SELECT /*+ STATE_TTL('V' = '${ttl_V}', 'C' = '${ttl_C}')*/ * FROM V JOIN C ON ...;

2. Group Aggregate

EXPLAIN SELECT /*+ STATE_TTL('lineitem' = '1d')*/ l_partkey, SUM(l_extendedprice) AS revenue
FROM lineitem
GROUP BY l_partkey;

== Optimized Physical Plan ==
GroupAggregate(groupBy=[l_partkey], select=[l_partkey, SUM(l_extendedprice) AS revenue], hints=[[[state_ttl options:{lineitem=86400000}]]])
+- Exchange(distribution=[hash[l_partkey]])
   +- Calc(select=[l_partkey, l_extendedprice])
      +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])

-- left input subquery contains group aggregate
EXPLAIN
SELECT /*+ STATE_TTL('o' = '3d', 'agg' = '1d', 'c' = '10d') */* FROM
orders o 
LEFT OUTER JOIN
(
    SELECT /*+ STATE_TTL('lineitem' = '1h')*/ l_orderkey, l_partkey, SUM(l_extendedprice) AS revenue
  FROM lineitem
  GROUP BY l_orderkey, l_partkey
) agg
ON o.o_orderkey = agg.l_orderkey
LEFT OUTER JOIN customers c
ON o.o_custkey = c.c_custkey;

== Optimized Physical Plan ==
Join(joinType=[LeftOuterJoin], where=[=(o_custkey, c_custkey)], select=[o_orderkey, o_custkey, o_status, l_orderkey, l_partkey, revenue, c_custkey, c_address], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{RIGHT=864000000}]]])
:- Exchange(distribution=[hash[o_custkey]])
:  +- Join(joinType=[LeftOuterJoin], where=[=(o_orderkey, l_orderkey)], select=[o_orderkey, o_custkey, o_status, l_orderkey, l_partkey, revenue], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey], joinHints=[[[state_ttl options:{LEFT=259200000, RIGHT=86400000}]]])
:     :- Exchange(distribution=[hash[o_orderkey]])
:     :  +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status])
:     +- Exchange(distribution=[hash[l_orderkey]])
:        +- GroupAggregate(groupBy=[l_orderkey, l_partkey], select=[l_orderkey, l_partkey, SUM(l_extendedprice) AS revenue], hints=[[[state_ttl options:{lineitem=3600000}]]])
:           +- Exchange(distribution=[hash[l_orderkey, l_partkey]])
:              +- Calc(select=[l_orderkey, l_partkey, l_extendedprice])
:                 +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])
+- Exchange(distribution=[hash[c_custkey]])
   +- TableSourceScan(table=[[default_catalog, default_database, customers]], fields=[c_custkey, c_address])

-- subquery contains regular join operation
EXPLAIN
SELECT /*+ STATE_TTL('tmp' = '48h')*/ o_orderkey, c_custkey, SUM(l_extendedprice) AS revenue
FROM (
    SELECT /*+ STATE_TTL('o' = '3d', 'l' = '1d', 'c' = '10d') */* 
    FROM orders o LEFT OUTER JOIN lineitem l
    ON o.o_orderkey = l.l_orderkey
    LEFT OUTER JOIN customers c
    ON o.o_custkey = c.c_custkey 
) tmp
GROUP BY o_orderkey, c_custkey;

== Optimized Physical Plan ==
GroupAggregate(groupBy=[o_orderkey, c_custkey], select=[o_orderkey, c_custkey, SUM_RETRACT(l_extendedprice) AS revenue], hints=[[[state_ttl options:{tmp=172800000}]]])
+- Exchange(distribution=[hash[o_orderkey, c_custkey]])
   +- Calc(select=[o_orderkey, c_custkey, l_extendedprice])
      +- Join(joinType=[LeftOuterJoin], where=[=(o_custkey, c_custkey)], select=[o_orderkey, o_custkey, l_extendedprice, c_custkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{RIGHT=864000000}]]])
         :- Exchange(distribution=[hash[o_custkey]])
         :  +- Calc(select=[o_orderkey, o_custkey, l_extendedprice])
         :     +- Join(joinType=[LeftOuterJoin], where=[=(o_orderkey, l_orderkey)], select=[o_orderkey, o_custkey, l_orderkey, l_extendedprice], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[state_ttl options:{LEFT=259200000, RIGHT=86400000}]]])
         :        :- Exchange(distribution=[hash[o_orderkey]])
         :        :  +- Calc(select=[o_orderkey, o_custkey])
         :        :     +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status])
         :        +- Exchange(distribution=[hash[l_orderkey]])
         :           +- Calc(select=[l_orderkey, l_extendedprice])
         :              +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])
         +- Exchange(distribution=[hash[c_custkey]])
            +- Calc(select=[c_custkey])
               +- TableSourceScan(table=[[default_catalog, default_database, customers]], fields=[c_custkey, c_address])

Pass Hint Value to the Operator's StateMetadata

Take stream regular join as an example. The JoinHintResolver will validate and resolve the state TTL hints, and normalize the TTL duration value with ms as the time unit, which aligns with the default time unit of table.exec.state.ttl .

The StreamPhysicalJoinRule will pass the validated RelHint as the argument to create StreamPhysicalJoin. StreamPhysicalJoin#translateToExecNode then converts the JOIN_STATE_TTL hint to a map, which maps the input ordinal to the specified TTL value.

StreamPhysicalJoin
  override def translateToExecNode(): ExecNode[_] = {
+   val ttlFromHint = new util.HashMap[JInt, JLong]
+   getHints
+     .filter(hint => StateTtlHint.isStateTtlHint(hint.hintName))
+     .forEach {
+       hint =>
+         hint.kvOptions.forEach(
+           (input, ttl) =>
+             ttlFromHint.put(if (input == JoinStrategy.LEFT_INPUT) 0 else 1, ttl.toLong))
+     }
    new StreamExecJoin(
      unwrapTableConfig(this),
      joinSpec,
      getUpsertKeys(left, joinSpec.getLeftKeys),
      getUpsertKeys(right, joinSpec.getRightKeys),
      InputProperty.DEFAULT,
      InputProperty.DEFAULT,
+     ttlFromHint,
      FlinkTypeFactory.toLogicalRowType(getRowType),
      getRelDetailedDescription)
  }

StreamExecJoin will prioritize taking values from the hint as the TTL for the stream join operator.

StreamExecJoin
    public StreamExecJoin(
            ReadableConfig tableConfig,
            JoinSpec joinSpec,
            List<int[]> leftUpsertKeys,
            List<int[]> rightUpsertKeys,
            InputProperty leftInputProperty,
            InputProperty rightInputProperty,
+           Map<Integer, Long> stateTtlFromHint, // key is the input ordinal, value is the TTL retrieved from hint
            RowType outputType,
            String description) {
        this(
                ExecNodeContext.newNodeId(),
                ExecNodeContext.newContext(StreamExecJoin.class),
                ExecNodeContext.newPersistedConfig(StreamExecJoin.class, tableConfig),
                joinSpec,
                leftUpsertKeys,
                rightUpsertKeys,
+               StateMetadata.getMultiInputOperatorDefaultMeta(
+                       stateTtlFromHint, tableConfig, LEFT_STATE_NAME, RIGHT_STATE_NAME),
                Lists.newArrayList(leftInputProperty, rightInputProperty),
                outputType,
                description);
    }
StateMetadata
    public static List<StateMetadata> getMultiInputOperatorDefaultMeta(
+           Map<Integer, Long> stateTtlFromHint,
            ReadableConfig tableConfig,
            String... stateNameList) {
        Duration ttlFromTableConf = tableConfig.get(ExecutionConfigOptions.IDLE_STATE_RETENTION);
        List<StateMetadata> stateMetadataList = new ArrayList<>(stateNameList.length);
        for (int i = 0; i < stateNameList.length; i++) {
+           Duration stateTtl =
+                   stateTtlFromHint.containsKey(i)
+                           ? Duration.ofMillis(stateTtlFromHint.get(i))
+                           : ttlFromTableConf;
            stateMetadataList.add(new StateMetadata(i, stateTtl, stateNameList[i]));
        }
        return stateMetadataList;
    }

Affected StreamPhysicalRels

  • StreamPhysicalJoin
  • StreamPhysicalGroupAggregate
  • StreamPhysicalGlobalGroupAggregate (to support two-phase optimization)
  • StreamPhysicalIncrementalGroupAggregate (to support partial-global-agg and final-local-agg combine optimization)

Parameter Granularity and Priority

Different ways to change state TTL for Table API/ SQL PipelineGranularityPriority
SET 'table.exec.state.ttl' = '...' The value applies to the whole pipeline, all stateful operators will use the value as state TTL by default. This is the default state TTL configuration and can be overridden by enabling the STATE_TTL hint or modifying the value of the serialized CompiledPlan.
SELECT /*+ STATE_TTL(...) */ ... The value only applies to the specific operator (currently is join and group aggregate), which is translated from the hinted query block containing the specific clause.The hint precedes the default table.exec.state.ttl. This value will be serialized to the CompiledPlan during the plan translation phase.
Modify serialized JSON content of CompiledPlan The TTL for each stateful operator is explicitly serialized as an entry of the JSON. Modifying the JSON file can change the TTL for any stateful operator.The state metadata value derives from either <1> table.exec.state.ttl or <2> STATE_TTL hint. However, if users first enable the hint, compile the query to a JSON file, manually change the TTL for the join operator, and then submit the job via the plan, the deserialized compiled plan will accept the last modified value as the final TTL parameter.


Compatibility, Deprecation, and Migration Plan

This feature will be introduced as an opt-in enhancement and will not affect existing SQL queries or applications. The hint will be backward-compatible, allowing users to adopt the feature in their queries gradually.

Test Plan

Comprehensive unit tests will be added to ensure the correctness of parsing and the optimized plan of the STATE_TTL hint. Integration tests will also be conducted to validate the functionality of stream join and group aggregate operators with state TTL configured using SQL hints.

Rejected Alternatives

Rejected Hint KV Options
SELECT /*+ STATE_TTL('table' = 'orders', 'table.exec.state.ttl' = '1d'), STATE_TTL('table' = 'customers', 'table.exec.state.ttl' = '20d') */ * 
FROM orders LEFT OUTER JOIN customers

ON orders.o_custkey = customers.c_custkey

The KV Options are rejected due to verboseness.

References

[1] FLIP-292: Enhance COMPILED PLAN to support operator-level state TTL configuration

[2] https://lists.apache.org/thread/f4hz13g9gykdcwdvf0ws5xkgkytck2w1

[3] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#regular-joins

[4] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/group-agg/

[5] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/








  • No labels