...
Page properties |
---|
...
|
...
|
...
...
|
...
...
Released: <Flink Version>
...
|
Motivation
In SQL jobs, join is a very common computing operators. Currently, Flink can automatically select an appropriate join strategy for the SQL based on statistical information of the source table, and generate an execution plan to run the job. However, due to various reasons, such as missing or incorrect statistical information, the execution plan generated by the optimizer is not optimal, and the optimizer will use the wrong join strategy, resulting in a poor final optimization effort or even failing to execute.
...
The SQL syntax just like the following:
Code Block |
---|
query:
select /*+ hint_content[, hint_content] */ ...
hint_content:
hint_strategy_name(hint_item[,hint_item])
hint_strategy_name:
supported_join_hint_name
hint_item:
string_literal |
...
Support Strategies
The following Join strategies are currently supported in Flink SQL for batch job:
...
- Only supports join with equivalence except Full Outer Join
For example,
Code Block |
---|
// t1 is the broadcast table
select /*+ BROADCAST(t1) */ * from t1
join t2 on t1.a = t2.a
// when join between t1 and t2, t1 will be chosen as the broadcast table
// when join between the result after t1 joins t2 and t3,
// t3 will be chosen as the broadcast table
select /*+ BROADCAST(t1,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c
// BROADCAST does not support non-equivalent joins,
// so the planner's default strategy is adopted
select /*+ BROADCAST(t1) */ * from t1
join t2 on t1.a > t2.a
// BROADCAST does not support non-equivalent joins,
// so the planner's default strategy is adopted
select /*+ BROADCAST(t1) */ * from t1
full outer join t2 on t1.a = t2.a |
- SHUFFLE_HASH
- Only supports join with equivalence
For example,
Code Block |
---|
// t1 is the build side
select /*+ SHUFFLE_HASH(t1) */ * from t1
join t2 on t1.a = t2.a
// when join between t1 and t2, t1 will be chosen as the build side
// when join between the result after t1 joins t2 and t3,
// t3 will be chosen as the build side
select /*+ SHUFFLE_HASH(t1,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c
// SHUFFLE_HASH does not support non-equivalent join,
// so the planner's default strategy is adopted
select /*+ SHUFFLE_HASH(t1) */ * from t1
join t2 on t1.a > t2.a |
- SHUFFLE_MERGE
- Only supports join with equivalence
For example,
Code Block |
---|
// Sort Merge Join is adopted
select /*+ SHUFFLE_MERGE(t1) */ * from t1
join t2 on t1.a = t2.a
// Sort Merge Join is both adopted in these two joins
select /*+ SHUFFLE_MERGE(t1,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c
// SHUFFLE_MERGE does not support non-equivalent join,
// so the planner's default strategy is adopted
select /*+ SHUFFLE_MERGE(t1) */ * from t1
join t2 on t1.a > t2.a |
- NEST_LOOP
- Both support equivalent and non-equivalent joins
For example,
Code Block |
---|
// t1 is the build side
select /*+ NEST_LOOP(t1) */ * from t1
join t2 on t1.a = t2.a
// when join between t1 and t2, t1 will be chosen as the build side
// when join between the result after t1 joins t2 and t3,
// t3 will be chosen as the build side
select /*+ NEST_LOOP(t1,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c |
Proposed Changes
Join Hint Strategy Definition
According to Flip-204 [3], we can directly define Join Hint through FlinkHintStrategies, taking SHUFFLE_HASH as an example:
Code Block |
---|
HintStrategyTable.builder()
.hintStrategy(
"SHUFFLE_HASH",
HintStrategy.builder(
HintPredicates.or(
HintPredicates.JOIN, HintPredicates.CORRELATE))
.optionChecker(...)
.build())
.build(); |
We can put all the Join Hint strategies that we plan to support and check the number of parameters through the "optionChecker".
...
The behavior of semantic validation is before optimizing the SQL, and the hebavior behavior when meets errors is to throw an exception.
...
The logic above only needs to be added to the method of the current Rules named "matches". Part of the pseudocode of the util is as follows:
Code Block |
---|
// check whether this join Strategy is valid. E.g, the join doesn't
// contain at least one equal-join condition or the table config
// forbids using SortMergeJoin.
if(!checkJoinStrategyValid(join, tableConfig, joinStrategy)){
return false;
}
List hints = join.getHints;
List validHints = new ArrayList();
// collect the valid join hints
hints.forEach(hint ->{
JoinStrategy strategy = JoinStrategy.getJoinStrategy(hint.hintName);
if(checkJoinStrategyValid(join, tableConfig, strategy)){
validHints.add(strategy);
}
})
// if all join hints are invalid, treat it like no join hints
if(validHints.size() == 0){
return true;
}
// if multiple hints are valid, pick the first one to apply
return validHints.sort().get(0) == joinStrategy; |
Special Status With Hints
...
Join Hint only affects the current query block, and does not affect the Join strategy in subquery and view. If the join hint really needs to be propagated into a view or subquery, we can discuss it in future,.
For example,
Code Block |
---|
// view
create view view1 as select t1.* from |
...
t1 join on t2 on t1.a = t2.a
// BROADCAST will not be propagated into view1,
// and view1 will use the planner's default join strategy
select /*+ BROADCAST(t1) */ * from view1 join t3 on view1.a = t2.a
// the join in view1 will use the planner's default join strategy,
// and the join between view1 and t1 will use BROADCAST
select /*+ BROADCAST(t1) */ * from view1 join t1 on view1.a = t1.a |
Support with table alias
Currently, join hint on alias is not supported, and user can create a view for this subquery with a specific view name and use this view name as an argument of the join hint.
For example,
Code Block |
---|
// an exception that the v2 is not existent will be thrown select /*+ BROADCAST(t1) * |
...
/ t1.a from (select * from |
...
test1) t1 join |
...
test2 on |
...
t1.a = |
...
test2.a // |
...
|
...
use the |
...
equivalent |
...
sql following create view t1 as select * from test1; select /*+ BROADCAST(t1) */ |
...
t1.a from |
...
t1 join |
...
test2 on |
...
t1.a = |
...
test2.a |
Conflict in one same hints
If the hint declare the both sides of the join as the build side, the side being written first will finally treat as the build side. For example,
Code Block |
---|
// the first will be chosen, and t2 is the broadcast table
select /*+ BROADCAST(t2), BROADCAST(t1) */ * from t1
join t2 on t1.a = t2.a
// t1 is the broadcast table
select /*+ BROADCAST(t1,t1) */ * from t1
join t2 on t1.a = t2.a
// the first will be chosen, and t2 is the broadcast table
select /*+ BROADCAST(t2,t1) */ * from t1
join t2 on t1.a = t2.a
// the first will be chosen, and t2 is the broadcast table
select /*+ BROADCAST(t2,t1), BROADCAST(t1,t2) */ * from t1
join t2 on t1.a = t2.a
// just like BROADCAST(t1, t2) + BROADCAST(t3)
// when join between t1 and t2, t1 will be chosen as the broadcast table
// when join between the result after t1 joins t2 and t3,
// t3 will be chosen as the broadcast table
select /*+ BROADCAST(t1,t2,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c
// just like SHUFFLE_HASH(t1, t2) + SHUFFLE_HASH(t3)
// when join between t1 and t2, t1 will be chosen as the build side
// when join between the result after t1 joins t2 and t3,
// t3 will be chosen as the build side
select /*+ SHUFFLE_HASH(t1,t2,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c
// Sort Merge Join is adopted
select /*+ SHUFFLE_MERGE(t1,t1) */ * from t1
join t2 on t1.a = t2.a
// Sort Merge Join is adopted
select /*+ SHUFFLE_MERGE(t2), SHUFFLE_MERGE(t1) */ * from t1
join t2 on t1.a = t2.a
// Sort Merge Join is adopted
select /*+ SHUFFLE_MERGE(t2,t1) */ * from t1
join t2 on t1.a = t2.a
// Sort Merge Join is adopted
select /*+ SHUFFLE_MERGE(t2,t1), SHUFFLE_MERGE(t1,t2) */ * from t1
join t2 on t1.a = t2.a
// Sort Merge Join is both adopted in these two joins
select /*+ SHUFFLE_MERGE(t1,t2,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c
// the first will be chosen, and t1 is the build side
select /*+ NEST_LOOP(t2), NEST_LOOP(t1) */ * from t1
join t2 on t1.a = t2.a
// t1 is the build side
select /*+ NEST_LOOP(t1,t1) */ * from t1
join t2 on t1.a = t2.a
// the first will be chosen, and t2 is the build side
select /*+ NEST_LOOP(t2,t1) */ * from t1
join t2 on t1.a = t2.a
// the first will be chosen, and t2 is the build side
select /*+ NEST_LOOP(t2,t1), NEST_LOOP(t1,t2) */ * from t1
join t2 on t1.a = t2.a
// just like NEST_LOOP(t1, t2) + NEST_LOOP(t3)
// when join between t1 and t2, t1 will be chosen as the build side
// when join between the result after t1 joins t2 and t3,
// t3 will be chosen as the build side
select /*+ NEST_LOOP(t1,t2,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c |
Conflict between different hints
If multiple Join Hints are defined and conflict, they will be tried to apply one by on with the written order. If neither is satisfied, the planner's default join strategy is used. For example,
Code Block |
---|
// throw an exception when parse this SQL
select /*+ BROADCAST(t1) */ /*+ SHUFFLE_HASH(t1) */ * from t1
join t3 on t1.a = t2.a
// the first hint BROADCAST will be chosen
select /*+ BROADCAST(t1) SHUFFLE_HASH(t1) */ * from t1
join t3 on t1.a = t2.a
// although BROADCAST is first, BROADCAST does not support full outer join,
// so choose the following join hint SHUFFLE_HASH
select /*+ BROADCAST(t1) SHUFFLE_HASH(t1) */ * from t1
full outer join t3 on t1.a = t2.a
// although there are two join hints defined,
// but all of them are neither support non-equivalent join,
// so choose the default strategy that planner infers
select /*+ BROADCAST(t1) SHUFFLE_HASH(t1) */ * from t1
join t3 on t1.a > t2.a |
Dependent Processes
The use of Join Hint currently relies on the work that Calcite needs to be upgrades to 1.31. In Calcite 1.31, the following issue is fixed in it.
...
For join hint conflicts, we have the many different behaviors to choose from. The following are the advantages and shortcomings of several behaviors. For specific examples corresponding to each behavior, please refer to Appendix [1].
advantage | shortcoming | |
proposal 1: writing order first (we support) |
|
|
proposal 2: high priority first (different join hints have different priority) |
|
|
proposal 3: least cost first |
|
|
proposal 4: only try the first hint |
|
|
proposal 5: ignore conflict hints |
|
|
References
[1] https://docs.google.com/document/d/19PL77ZggPIhz7FMZv2Yx7w_cnYyvmTc-mxUpJYv_934/edit?usp=sharing
...