Status
...
Page properties | |
---|---|
|
...
...
...
...
|
...
|
Motivation
Look up join is commonly used feature in Flink SQL. We have received many optimization requirements on look up join. For example:
1. Enforces Suggests left side of lookup join do a hash partitioner to raise cache hint ratio
...
To enable hash lookup join, user only needs specify a new hint (useSHUFFLE_hashHASH) in select clause in query, just like use_hash hint in Oraclewhich is similar with spark[2] sql.
Code Block | ||
---|---|---|
| ||
SELECT /*+ USESHUFFLE_HASH(Orders, 'Customers') */ o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id; |
Note:
- Table
...
- name in
...
- SHUFFLE_
...
- HASH hint is
...
- build table name. Lookup join only supports dimension table to be build table, does not support left side to be build table.
- The hint only provides a suggestion to the optimization, it is not an enforcer.
Proposed Changes
Define Hint Strategy
"USESHUFFLE_HASH", this hint can only be applied to CORRELATE relations which satisfy the following conditions:
- the correlate is a look up join, other correlate would ignore the hint
- the correlate has "Orders" and has "Customers" as the input table namesdimension table name.
The code below shows how we define hint strategy for hash lookupJoin.
Code Block | ||||
---|---|---|---|---|
| ||||
builder .hintStrategy("USESHUFFLE_HASH", HintStrategy.builder( HintPredicates.and(HintPredicates.CORRELATE, isLookupJoin(), joinWithFixedTableNamewithBuildTableName()))) .build(); |
Note,
it has a blocker on Calcite version upgrade.
Calcite would translate above sql into Correlate
instead of Join
. Correlate
is not a kind of RelNode
that can attach RelHint
s currently. The hint could not be propagated to the Correlate
nodes, eitheruntil 1.30.0 version.
I've report a
in Calcite, which has been merged and would be published in CALCITE 1.30.0 version. Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key CALCITE-4967
Hint Propagation in Optimizer
We need to ensure the hint would not missed before it is finally used to enforce require the distribution on inputs of LookupJoin.
...
LookupJoinRules would check whether FlinkLogicalJoin
contain USESHUFFLE_HASH Hint. If yes, and the rules require the input must have hash distribution on join keys when converting FlinkLogicalJoin
to LookupJoin
.
Note: If the input stream is not an insert stream, which means, it could contain update_before, update_after or delete record, if it's upsert key is different with join key, update before and update may be sent to different tasks after hash partition which may leads to wrong result.
So hash lookup Join requires that the input stream should be insert_only stream or its upsert keys contains hash keys.
Compatibility, Deprecation, and Migration Plan
...
Besides, we would add integration tests for connectors to verify it can cooperate with existing source/sink implementations.
Other Alternatives
Rejected Alternatives
Hint Syntax
we could use name 'use_hash' just like hint in Oracle.
Code Block | ||
---|---|---|
| ||
SELECT /*+ USE_HASH('Customers') */ o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id; |
SQL Server[4] uses keyword 'hash' instead of query hint, it's not a good choise for use, so we ignore this.
Implementation
There is a simpler but a little hack solutionhacky implementation, this is also what we apply in the internal version.
That is, propagating 'useSHUFFLE_hashHASH' hint to TableScan
with matched table names. In this way, the hint would not be missed by Flink optimizer until it needs the hint in LookupJoinRules.
...
Compared with the previous solution, this solution has two advantages:
- It does not need Calcite version upgrade, see
Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key CALCITE-4967 It does not need code refactor to ensure the hint would not be missed by Flink optimizer.
...
Anyway, the difference between the two solution is only about the internal implementation and has no impact on the user.
Reference
[1] Oracle Hash Join hintUSE_Hash hint
Code Block | ||
---|---|---|
| ||
SELECT /*+ USE_HASH(l h) */ *
FROM orders h, order_items l
WHERE l.order_id = h.order_id
AND l.order_id > 3500; |
Code Block | ||
---|---|---|
| ||
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key; |
Code Block | ||
---|---|---|
| ||
SELECT straight_join weather.wind_velocity, geospatial.altitude
FROM weather JOIN /* +SHUFFLE */ geospatial
ON weather.lat = geospatial.lat AND weather.long = geospatial.long; |
Code Block | ||
---|---|---|
| ||
SELECT p.Name, pr.ProductReviewID FROM Production.Product AS p LEFT OUTER HASH JOIN Production.ProductReview AS pr ON p.ProductID = pr.ProductID ORDER BY ProductReviewID DESC; |