Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-23687

...

Release


Motivation

Look up join is commonly used feature in Flink SQL. We have received many optimization requirements on look up join. For example:
1. Suggests left side of lookup join do a hash partitioner to raise cache hint ratio

...

To enable hash lookup join, user only needs specify a new hint (SHUFFLE_HASH) in select clause in query, just like which is similar with spark[2] sql.

Code Block
languagesql
SELECT /*+ SHUFFLE_HASH('Orders', 'Customers') */ o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;


Note:

  1. Table names name in SHUFFLE_HASH hint is required to avoid mistake hint propagation if there are multiple lookup joinsbuild table name. Lookup join only supports dimension table to be build table, does not support left side to be build table.
  2. The hint only provides a suggestion to the optimization, it is not an enforcer.

...

  1. the correlate is a look up join, other correlate would ignore the hint
  2. the correlate has "Orders" and has  "Customers" as the input table names. In theory, it's better to support both table names and alias names, but the alias name of subquery or table would not be lost by the sql converter and sql optimizer. So here we only support table names.dimension table name. 

The code below shows how we define hint strategy for hash lookupJoin.

Code Block
languagejava
titleSHUFFLE_HASH hint strategy
      builder
        .hintStrategy("SHUFFLE_HASH",
            HintStrategy.builder(
                HintPredicates.and(HintPredicates.CORRELATE, isLookupJoin(), lookupJoinWithFixedTableNamewithBuildTableName())))
        .build();

Note,

it has a blocker on Calcite version upgrade.

...

Code Block
languagesql
SELECT /*+ USE_HASH('Orders', 'Customers') */ o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;


SQL Server[34] uses keyword 'hash' instead of query hint, it's not a good choise for use, so we ignore this.

...

Anyway, the difference between the two solution is only about the internal implementation and has no impact on the user.

Reference

[1] Oracle Hash hint syntaxUSE_Hash hint

Code Block
languagesql
SELECT /*+ USE_HASH(l h) */ *
  FROM orders h, order_items l
  WHERE l.order_id = h.order_id
    AND l.order_id > 3500;


[2] Spark Hash hint syntax[3] SQL Server Hash Keyword syntaxSHUFFLE_HASH hint

Code Block
languagesql
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;

[3] IMPALA SHUFFLE hint

Code Block
languagesql
SELECT straight_join weather.wind_velocity, geospatial.altitude
  FROM weather JOIN /* +SHUFFLE */ geospatial
  ON weather.lat = geospatial.lat AND weather.long = geospatial.long;


[4] SQL Server Hash Keyword

Code Block
languagesql
SELECT p.Name, pr.ProductReviewID FROM Production.Product AS p LEFT OUTER HASH JOIN Production.ProductReview AS pr ON p.ProductID = pr.ProductID ORDER BY ProductReviewID DESC;