Status
...
Page properties | |
---|---|
|
...
...
...
|
...
|
Motivation
Look up join is commonly used feature in Flink SQL. We have received many optimization requirements on look up join. For example:
1. Suggests left side of lookup join do a hash partitioner to raise cache hint ratio
...
To enable hash lookup join, user only needs specify a new hint (SHUFFLE_HASH) in select clause in query, just like which is similar with spark[2] sql.
Code Block | ||
---|---|---|
| ||
SELECT /*+ SHUFFLE_HASH('Orders', 'Customers') */ o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id; |
Note:
- Table names name in useSHUFFLE_hash HASH hint is required to avoid mistake hint propagation if there are multiple lookup joinsbuild table name. Lookup join only supports dimension table to be build table, does not support left side to be build table.
- The hint only provides a suggestion to the optimization, it is not an enforcer.
...
- the correlate is a look up join, other correlate would ignore the hint
- the correlate has has "Orders" and "Customers" as the input table names. In theory, it's better to support both table names and alias names, but the alias name of subquery or table would not be lost by the sql converter and sql optimizer. So here we only support table names.dimension table name.
The code below shows how we define hint strategy for hash lookupJoin.
Code Block | ||||
---|---|---|---|---|
| ||||
builder .hintStrategy("SHUFFLE_HASH", HintStrategy.builder( HintPredicates.and(HintPredicates.CORRELATE, isLookupJoin(), lookupJoinWithFixedTableNamewithBuildTableName()))) .build(); |
Note,
it has a blocker on Calcite version upgrade.
...
LookupJoinRules would check whether FlinkLogicalJoin
contain USESHUFFLE_HASH Hint. If yes, and the rules require the input must have hash distribution on join keys when converting FlinkLogicalJoin
to LookupJoin
.
...
Code Block | ||
---|---|---|
| ||
SELECT /*+ USE_HASH('Orders', 'Customers') */ o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id; |
SQL Server[34] uses keyword 'hash' instead of query hint, it's not a good choise for use, so we ignore this.
...
Anyway, the difference between the two solution is only about the internal implementation and has no impact on the user.
Reference
[1] Oracle Hash hint syntaxUSE_Hash hint
Code Block | ||
---|---|---|
| ||
SELECT /*+ USE_HASH(l h) */ *
FROM orders h, order_items l
WHERE l.order_id = h.order_id
AND l.order_id > 3500; |
[2] Spark Hash hint syntax[3] SQL Server Hash Keyword syntaxSHUFFLE_HASH hint
Code Block | ||
---|---|---|
| ||
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key; |
Code Block | ||
---|---|---|
| ||
SELECT straight_join weather.wind_velocity, geospatial.altitude
FROM weather JOIN /* +SHUFFLE */ geospatial
ON weather.lat = geospatial.lat AND weather.long = geospatial.long; |
Code Block | ||
---|---|---|
| ||
SELECT p.Name, pr.ProductReviewID FROM Production.Product AS p LEFT OUTER HASH JOIN Production.ProductReview AS pr ON p.ProductID = pr.ProductID ORDER BY ProductReviewID DESC; |