Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagesql
themeConfluence
-- the group aggregate query block as the input of the regular join
SELECT /*+ STATE_TTL('agg' = '1d', 'customers' = '7d') */ * 
FROM (
    SELECT /*+ STATE_TTL('orders' = '30d') */ o_custkey, COUNT(o_orderkey) AS order_num
    FROM orders
    GROUP BY o_custkey
) agg
LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;

-- the regular join query block as the input of the group aggregate
SELECT /*+ STATE_TTL('tmp' = '1d') */ c_custkey, SUM(o_totalprice) AS revenue
FROM (
    SELECT /*+ STATE_TTL('o' = '12h', 'c' = '3d') */ * 
    FROM orders o LEFT OUTER JOIN customers c
    ON o.o_custkey = c.c_custkey
) tmp;

NoteException Handling

If When users mistakenly use the STATE_TTL hint, it will be silently ignored, and no exception will be thrown. This is done to maintain consistency with the current behavior of hintsincorrectly use STATE_TTL hints, there are two possible scenarios:

Scenario 1: The user specifies STATE_TTL which can be applied to the current query block, but with a wrong hint key. For example, they set a non-existent table name/table alias or a query block alias. In this case, an exception should be explicitly thrown. E.g. the following query will throw the org.apache.flink.table.api.ValidationException with a message "The options of following hints cannot match the name of input tables or views:\n`foo` in `state_ttl`". And this check is already done by JoinHintResolver (See JoinHintResolver.java#L193).

Code Block
languagesql
themeConfluence
-- hint with a non-exist table name as key
SELECT /*+ STATE_TTL('foo'= '1d', 'customers' = '20d') */ * FROM orders LEFT OUTER JOIN customers
ON orders.o_custkey = customers.c_custkey;

We can do a similar check for the aggregate node when there is an invalid hint key (this is newly introduced since there has been no aggregate hint check before).


Scenario 2: The user includes the STATE_TTL hint in a query block that does not support it. In this case, the hint will be ignored without throwing an exception. It's intended to do so to remain consistent with the current hint behavior.

E.g. SHUFFLE_MERGE is a batch join hint that only supports join with equivalence join condition (See hints/#shuffle_merge), but we can still get a valid plan and execution result under streaming mode, and with the non-equi condition, the hint is quietly ignored.

Code Block
languagesql
themeConfluence
-- explain the query under streaming mode
EXPLAIN SELECT /*+ SHUFFLE_MERGE('o') */* from
orders o LEFT JOIN
lineitem l
ON o.o_totalprice > l.l_extendedprice;

== Optimized Physical Plan ==
Join(joinType=[LeftOuterJoin], where=[>(o_totalprice, l_extendedprice)], select=[o_orderkey, o_custkey, o_status, o_totalprice, l_linenumber, l_orderkey, l_partkey, l_extendedprice], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey], joinHints=[[[SHUFFLE_MERGE options:[LEFT]]]])
:- Exchange(distribution=[single])
:  +- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[o_orderkey, o_custkey, o_status, o_totalprice])
+- Exchange(distribution=[single])
   +- TableSourceScan(table=[[default_catalog, default_database, lineitem]], fields=[l_linenumber, l_orderkey, l_partkey, l_extendedprice])

The reason is explained in the FLIP-229: Introduces Join Hint for Flink SQL Batch Job, quoted as follows.

> 3. Check when optimizing

> In the optimization stage of the optimizer, it is judged that this Join Hint can be applied. If it cannot be applied, the behavior is to print a warning log instead of throwing an exception directly.


Hint Strategy

Code Block
languagejava
themeConfluence
titleFlinkHintStrategies
HintStrategyTable.builder()
    .hintStrategy("STATE_TTL",
            HintStrategy.builder(
                    HintPredicates.or(
                            HintPredicates.JOIN, HintPredicates.AGGREGATE))
                .optionChecker(STATE_TTL_OPTION_CHECKER)
                .build())
    .build();

...