Status
Current state: [Under Discussion]
Discussion thread: https://lists.apache.org/list?dev@flink.apache.org:lte=1M:Introduce%20Hash%20Lookup%20Join
Vote thread: -
JIRA:
Released: -
Motivation
Look up join is commonly used feature in Flink SQL. We have received many optimization requirements on look up join. For example:
1. Enforces left side of lookup join do a hash partitioner to raise cache hint ratio
2. Solves the data skew problem after introduces hash lookup join
3. Enables mini-batch optimization to reduce RPC call
We would focus on point 1 in this FLIP, and continue to discuss point2 and point 3 in the later FLIP.
There are many similar requirements from user mail list and JIRA about hash Lookup Join, for example:
SQL Syntax
To enable hash lookup join, user only needs specify a new hint (use_hash) in select clause in query, just like use_hash hint in Oracle.
SELECT /*+ USE_HASH(Orders, Customers) */ o.order_id, o.total, c.country, c.zip FROM Orders AS o JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c ON o.customer_id = c.id;
Note: Table names in use_hash hint is required to avoid mistake hint propagation if there are multiple lookup joins.
Proposed Changes
Define Hint Strategy
"USE_HASH", this hint can only be applied to CORRELATE relations which satisfy the following conditions:
- the correlate is a look up join, other correlate would ignore the hint
- the correlate has "Orders" and "Customers" as the input table names
The code below shows how we define hint strategy for hash lookupJoin.
builder .hintStrategy("USE_HASH", HintStrategy.builder( HintPredicates.and(HintPredicates.CORRELATE, isLookupJoin(), joinWithFixedTableName()))) .build();
Note, it has a blocker on Calcite upgrade.
Calcite would translate above sql into Correlate
instead of Join
. Correlate
is not a kind of RelNode
that can attach RelHint
s currently. The hint could not be propagated to the Correlate
nodes, either.
I've report a in Calcite.
Hint Propagation in Optimizer
We need to ensure the hint would not missed before it is finally used to enforce the distribution on inputs of LookupJoin.
This includes some refactors of existed rules and FlinkLogicalJoin.
Refactor rules in temporal_join_rewrite phase
In temporal_join_rewrite phase, the rules would check whether LogicalCorrelate
is a temporal table join which implemented by lookup join. If yes, the rules translate it to LogicalJoin
.
Currently the rules call RelBuilder#join
in Calcite which results a join without any hints, so the hint would be missed here.
Specifically, we need refactor LogicalCorrelateToJoinFromLookupTableRuleWithFilter
and LogicalCorrelateToJoinFromLookupTableRuleWithoutFilter
.
Refactor FlinkLogicalJoin
FlinkLogicalJoin
constructor does not take any hint now, so hints would be missed when converting LogicalJoin
to FlinkLogicalJoin
We need refactor FlinkLogicalJoin
constructor.
Refactor FlinkLogicalJoinConverter
Refactor FlinkLogicalJoin
rules to propagate the hints of LogicalJoin
. Besides, other places which crate FlinkLogicalJoin
instances which also need upgrade.
Use Hint to require Hash Distribution
LookupJoinRules would check whether FlinkLogicalJoin
contain USE_HASH Hint. If yes, the rules require the input must have hash distribution on join keys when converting FlinkLogicalJoin
to LookupJoin
.
Compatibility, Deprecation, and Migration Plan
Because Hash LookupJoin and skew LookupJoin are only enabled by hint syntax. The existed job in old version Flink would not be effected. Besides, their behavior are compatible even after they are upgraded to new version.
Test Plan
Each new feature would be covered by unit tests.
Besides, we would add integration tests for connectors to verify it can cooperate with existing source/sink implementations.
Other Alternatives
There is a simpler but a little hack solution, this is also what we apply in the internal version.
That is, propagating 'use_hash' hint to TableScan
with matched table names. In this way, the hint would not be missed by Flink optimizer until it needs the hint in LookupJoinRules.
LookupJoinRules would only check whether the dimension table scan has 'use_hash' hint. If yes, it would require the input must have hash distribution on join keys.
Compared with the previous solution, this solution has two advantages:
- It does not need Calcite upgrade, see
It does not need code refactor to ensure the hint would not be missed by Flink optimizer.
However, this method is a bit hacky conceptually because whether to enable Hash is the attribute of the lookup Join instead of dimension tableScan.
Anyway, the difference between the two solution is only about the internal implementation and has no impact on the user.