Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion

...

thread

...

...

...

...

Released: <Flink Version>

...

thread/s70cjbbr5565m44f4mfqo9w7xdq09cf1
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-27853

Release1.16


Motivation

In SQL jobs, join is a very common computing operators. Currently, Flink can automatically select an appropriate join strategy for the SQL based on statistical information of the source table, and generate an execution plan to run the job. However, due to various reasons, such as missing or incorrect statistical information, the execution plan generated by the optimizer is not optimal, and the optimizer will use the wrong join strategy, resulting in a poor final optimization effort or even failing to execute.

...

    • Only supports join with equivalence except Full Outer Join

For example,

Code Block
// t1 is the broadcast table
select /*+ BROADCAST(t1) */ * from t1
join t2 on t1.a = t2.a

// when join between t1 and t2, t1 will be chosen as the broadcast table
// when join between the result after t1 joins t2 and t3, 
// t3 will be chosen as the broadcast table
select /*+ BROADCAST(t1,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c

// BROADCAST does not support non-equivalent joins, 
// so the planner's default strategy is adopted
select /*+ BROADCAST(t1) */ * from t1
join t2 on t1.a > t2.a

// BROADCAST does not support non-equivalent joins, 
// so the planner's default strategy is adopted
select /*+ BROADCAST(t1) */ * from t1
full outer join t2 on t1.a = t2.a

  • SHUFFLE_HASH
    • Only supports join with equivalence

For example,

Code Block
// t1 is the build side
select /*+ SHUFFLE_HASH(t1) */ * from t1
join t2 on t1.a = t2.a

// when join between t1 and t2, t1 will be chosen as the build side
// when join between the result after t1 joins t2 and t3, 
// t3 will be chosen as the build side
select /*+ SHUFFLE_HASH(t1,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c

// SHUFFLE_HASH does not support non-equivalent join, 
// so the planner's default strategy is adopted
select /*+ SHUFFLE_HASH(t1) */ * from t1
join t2 on t1.a > t2.a

  • SHUFFLE_MERGE
    • Only supports join with equivalence

For example,

Code Block
// Sort Merge Join is adopted
select /*+ SHUFFLE_MERGE(t1) */ * from t1
join t2 on t1.a = t2.a

// Sort Merge Join is both adopted in these two joins
select /*+ SHUFFLE_MERGE(t1,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c

// SHUFFLE_MERGE does not support non-equivalent join, 
// so the planner's default strategy is adopted
select /*+ SHUFFLE_MERGE(t1) */ * from t1
join t2 on t1.a > t2.a

  • NEST_LOOP
    • Both support equivalent and non-equivalent joins

For example,

Code Block
// t1 is the build side
select /*+ NEST_LOOP(t1) */ * from t1
join t2 on t1.a = t2.a

// when join between t1 and t2, t1 will be chosen as the build side
// when join between the result after t1 joins t2 and t3, 
// t3 will be chosen as the build side
select /*+ NEST_LOOP(t1,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c

Proposed Changes

Join Hint Strategy Definition

According to Flip-204 [3], we can directly define Join Hint through FlinkHintStrategies, taking SHUFFLE_HASH as an example:

Code Block
HintStrategyTable.builder()
                 .hintStrategy(
                        "SHUFFLE_HASH",
                        HintStrategy.builder(
                                    HintPredicates.or(
                                      HintPredicates.JOIN, HintPredicates.CORRELATE))
                                .optionChecker(...)
                                .build())
                .build();

We can put all the Join Hint strategies that we plan to support and check the number of parameters through the "optionChecker".

...

The behavior of semantic validation is before optimizing the SQL, and the hebavior behavior when meets errors is to throw an exception.

...

The logic above only needs to be added to the method of the current Rules named "matches". Part of the pseudocode of the util is as follows:

Code Block
// check whether this join Strategy is valid. E.g, the join doesn't
// contain at least one equal-join condition or the table config 
// forbids using SortMergeJoin.
if(!checkJoinStrategyValid(join, tableConfig, joinStrategy)){
  return false;
}

List hints = join.getHints;
List validHints = new ArrayList();

// collect the valid join hints
hints.forEach(hint ->{ 
  JoinStrategy strategy = JoinStrategy.getJoinStrategy(hint.hintName);
  if(checkJoinStrategyValid(join, tableConfig, strategy)){
    validHints.add(strategy);
  }
})

// if all join hints are invalid, treat it like no join hints
if(validHints.size() == 0){
  return true;
}

// if multiple hints are valid, pick the first one to apply
return validHints.sort().get(0) == joinStrategy;

Special Status With Hints

...

Join Hint only affects the current query block, and does not affect the Join strategy in subquery and view. If the join hint really needs to be propagated into a view or subquery, we can discuss it in future,.

For example,

Code Block
// view
create view view1 as select t1.* from

...

 t1 join on t2 on t1.a = t2.a

// BROADCAST will not be propagated into view1,
// and view1 will use the planner's default join strategy
select /*+ BROADCAST(t1) */ * from view1 join t3 on view1.a = t2.a

// the join in view1 will use the planner's default join strategy,
// and the join between view1 and t1 will use BROADCAST
select /*+ BROADCAST(t1) */ * from view1 join t1 on view1.a = t1.a

Support with table alias

Currently, join hint on alias is not supported, and user can create a view for this subquery with a specific view name and use this view name as an argument of the join hint.

For example,

Code Block
// an exception that the v2 is not existent will be thrown 
select /*+ BROADCAST(t1) *

...

/ t1.a
from (select * from 

...

test1) t1 join 

...

test2
on 

...

t1.a = 

...

test2.a

//

...

 

...

use the 

...

equivalent 

...

sql following
create view t1 as select * from test1;
select /*+ BROADCAST(t1) */ 

...

t1.a
from 

...

t1 join 

...

test2 on 

...

t1.a = 

...

test2.a

Conflict in one same hints

If the hint declare the both sides of the join as the build side, the side being written first will finally treat as the build side. For example,

Code Block
// the first will be chosen, and t2 is the broadcast table
select /*+ BROADCAST(t2), BROADCAST(t1) */ * from t1
join t2 on t1.a = t2.a

// t1 is the broadcast table
select /*+ BROADCAST(t1,t1) */ * from t1
join t2 on t1.a = t2.a

// the first will be chosen, and t2 is the broadcast table
select /*+ BROADCAST(t2,t1) */ * from t1
join t2 on t1.a = t2.a

// the first will be chosen, and t2 is the broadcast table
select /*+ BROADCAST(t2,t1), BROADCAST(t1,t2) */ * from t1
join t2 on t1.a = t2.a

// just like BROADCAST(t1, t2) + BROADCAST(t3)
// when join between t1 and t2, t1 will be chosen as the broadcast table
// when join between the result after t1 joins t2 and t3, 
// t3 will be chosen as the broadcast table
select /*+ BROADCAST(t1,t2,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c

// just like SHUFFLE_HASH(t1, t2) + SHUFFLE_HASH(t3)
// when join between t1 and t2, t1 will be chosen as the build side
// when join between the result after t1 joins t2 and t3, 
// t3 will be chosen as the build side
select /*+ SHUFFLE_HASH(t1,t2,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c

// Sort Merge Join is adopted
select /*+ SHUFFLE_MERGE(t1,t1) */ * from t1
join t2 on t1.a = t2.a

// Sort Merge Join is adopted
select /*+ SHUFFLE_MERGE(t2), SHUFFLE_MERGE(t1) */ * from t1
join t2 on t1.a = t2.a

// Sort Merge Join is adopted
select /*+ SHUFFLE_MERGE(t2,t1) */ * from t1
join t2 on t1.a = t2.a

// Sort Merge Join is adopted
select /*+ SHUFFLE_MERGE(t2,t1), SHUFFLE_MERGE(t1,t2) */ * from t1
join t2 on t1.a = t2.a

// Sort Merge Join is both adopted in these two joins
select /*+ SHUFFLE_MERGE(t1,t2,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c

// the first will be chosen, and t1 is the build side
select /*+ NEST_LOOP(t2), NEST_LOOP(t1) */ * from t1
join t2 on t1.a = t2.a

// t1 is the build side
select /*+ NEST_LOOP(t1,t1) */ * from t1
join t2 on t1.a = t2.a

// the first will be chosen, and t2 is the build side
select /*+ NEST_LOOP(t2,t1) */ * from t1
join t2 on t1.a = t2.a

// the first will be chosen, and t2 is the build side
select /*+ NEST_LOOP(t2,t1), NEST_LOOP(t1,t2) */ * from t1
join t2 on t1.a = t2.a

// just like NEST_LOOP(t1, t2) + NEST_LOOP(t3)
// when join between t1 and t2, t1 will be chosen as the build side
// when join between the result after t1 joins t2 and t3, 
// t3 will be chosen as the build side
select /*+ NEST_LOOP(t1,t2,t3) */ * from t1
join t2 on t1.a = t2.b
join t3 on t1.a = t3.c

Conflict between different hints

If multiple Join Hints are defined and conflict, they will be tried to apply one by on with the written order. If neither is satisfied, the planner's default join strategy is used. For example,

Code Block
// throw an exception when parse this SQL
select /*+ BROADCAST(t1) */ /*+ SHUFFLE_HASH(t1) */ * from t1
join t3 on t1.a = t2.a

// the first hint BROADCAST will be chosen
select /*+ BROADCAST(t1) SHUFFLE_HASH(t1) */ * from t1
join t3 on t1.a = t2.a

// although BROADCAST is first, BROADCAST does not support full outer join,
// so choose the following join hint SHUFFLE_HASH
select /*+ BROADCAST(t1) SHUFFLE_HASH(t1) */ * from t1
full outer join t3 on t1.a = t2.a

// although there are two join hints defined, 
// but all of them are neither support non-equivalent join,
// so choose the default strategy that planner infers
select /*+ BROADCAST(t1) SHUFFLE_HASH(t1) */ * from t1
join t3 on t1.a > t2.a

Dependent Processes

The use of Join Hint currently relies on the work that Calcite needs to be upgrades to 1.31. In Calcite 1.31, the following issue is fixed in it.

...

For join hint conflicts, we have the many different behaviors to choose from. The following are the advantages and shortcomings of several behaviors. For specific examples corresponding to each behavior, please refer to Appendix [1].


advantage

shortcoming

proposal 1: writing order first

(we support)

  • easy to resolve the hint conflict
  • the first hint by the written order may be not the most optimal strategy

proposal 2: high priority first

(different join hints have different priority)


  • always choose the better join strategy by the priority we define.
  • user can write two or more two join hints and all of them can be considered by actual scene
  • hard to define the priority when a new join strategy will be added
  • user can't control the order of trying different join strategies

proposal 3: least cost first

  • always choose the faster join strategy by CBO
  • user can write two or more join hints and all of them can be considered by CBO
  • It depends on CBO and may lead to an uncontrollable behavior.
  • user can't control the order of trying different join strategies

proposal 4: only try the first hint

  • easy to resolve the hint conflict
  • sometimes user want to try both two join hints but this proposal may only try one join hint

proposal 5: ignore conflict hints

  • easy enough to resolve the hint conflict
  • sometimes user want to try both two join hints but this proposal will ignore all of them

References

[1] https://docs.google.com/document/d/19PL77ZggPIhz7FMZv2Yx7w_cnYyvmTc-mxUpJYv_934/edit?usp=sharing

...