Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state[Under Discussion]

Discussion thread: https://lists.apache.org/list?dev@flink.apache.org:lte=1M:Introduce%20Hash%20Lookup%20Join

Vote thread: -

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-23687

Released: -

Motivation

Look up join is commonly used feature in Flink SQL. We have received many optimization requirements on look up join. For example:
1. Suggests left side of lookup join do a hash partitioner to raise cache hint ratio

2. Solves the data skew problem after introduces hash lookup join

3. As we know, in Hive dimension source, each task would load all data into cache. After introduce hash partitioner in point 1, each task could only load part of cache instead of load all cache.

4. Enables mini-batch optimization to reduce RPC call

We would focus on point 1 in this FLIP, and continue to discuss point2, point3 and point 4 in the later FLIP.

Many Lookup table sources introduce cache to reduce the RPC call, such as JDBC, CSV, HBase connectors.

For those connectors, we could raise cache hit ratio by routing the same lookup keys to the same task instance. This is the purpose of this FLIP.

There are many similar requirements from user mail list and JIRA about hash Lookup Join, for example:

  1. Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyFLINK-23687
  2. Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyFLINK-25396
  3. Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyFLINK-25262

SQL Syntax

To enable hash lookup join, user only needs specify a new hint (SHUFFLE_HASH) in select clause in query, just like spark[2] sql.

Code Block
languagesql
SELECT /*+ SHUFFLE_HASH('Orders', 'Customers') */ o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;


Note:

  1. Table names in use_hash hint is required to avoid mistake hint propagation if there are multiple lookup joins.
  2. The hint only provides a suggestion to the optimization, it is not an enforcer.

Proposed Changes

Define Hint Strategy

"SHUFFLE_HASH", this hint can only be applied to CORRELATE relations which satisfy the following conditions:

  1. the correlate is a look up join, other correlate would ignore the hint
  2. the correlate has "Orders" and "Customers" as the input table names. In theory, it's better to support both table names and alias names, but the alias name of subquery or table would not be lost by the sql converter and sql optimizer. So here we only support table names.

The code below shows how we define hint strategy for hash lookupJoin.

Code Block
languagejava
titleUSE_HASH hint strategy
      builder
        .hintStrategy("SHUFFLE_HASH",
            HintStrategy.builder(
                HintPredicates.and(HintPredicates.CORRELATE, isLookupJoin(), lookupJoinWithFixedTableName())))
        .build();

Note,

it has a blocker on Calcite version upgrade.

Calcite would translate above sql into Correlate  instead of Join . Correlate  is not a kind of RelNode  that can attach RelHint s until 1.30.0 version.

I've report a

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyCALCITE-4967
in Calcite, which has been merged and would be published in CALCITE 1.30.0 version.

Hint Propagation in Optimizer

We need to ensure the hint would not missed before it is finally used to require the distribution on inputs of LookupJoin. 

This includes some refactors of existed rules and FlinkLogicalJoin.

Refactor rules in temporal_join_rewrite phase

In temporal_join_rewrite phase, the rules would check whether LogicalCorrelate  is a temporal table join which implemented by lookup join. If yes, the rules translate it to LogicalJoin.

Currently the rules call RelBuilder#join  in Calcite which results a join without any hints, so the hint would be missed here.

Specifically, we need refactor LogicalCorrelateToJoinFromLookupTableRuleWithFilter  and LogicalCorrelateToJoinFromLookupTableRuleWithoutFilter .


Refactor FlinkLogicalJoin


FlinkLogicalJoin constructor does not take any hint now, so hints would be missed when converting LogicalJoin to FlinkLogicalJoin


We need refactor FlinkLogicalJoin  constructor.


Refactor FlinkLogicalJoinConverter

Refactor FlinkLogicalJoin  rules to propagate the hints of LogicalJoin . Besides, other places which crate FlinkLogicalJoin instances which also need upgrade.

Use Hint to require Hash Distribution

LookupJoinRules would check whether FlinkLogicalJoin contain USE_HASH Hint. If yes, and the rules require the input must have hash distribution on join keys when converting FlinkLogicalJoin to LookupJoin.

Note: If the input stream is not an insert stream, which means, it could contain update_before, update_after or delete record, if it's upsert key is different with join key, update before and update may be sent to different tasks after hash partition which may leads to wrong result.

So hash lookup Join requires that the input stream should be insert_only stream or its upsert keys contains hash keys.

Compatibility, Deprecation, and Migration Plan

Because Hash LookupJoin and skew LookupJoin are only enabled by hint syntax. The existed job in old version Flink would not be effected. Besides, their behavior are compatible even after they are upgraded to new version.

Test Plan

Each new feature would be covered by unit tests.

Besides, we would add integration tests for connectors to verify it can cooperate with existing source/sink implementations.

Rejected Alternatives


Hint Syntax

we could use name 'use_hash' just like hint in Oracle.

Code Block
languagesql
SELECT /*+ USE_HASH('Orders', 'Customers') */ o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;


SQL Server[3] uses keyword 'hash' instead of query hint, it's not a good choise for use, so we ignore this.

Implementation

There is a simpler but a little hacky implementation, this is also what we apply in the internal version.

That is, propagating 'SHUFFLE_HASH' hint to TableScan  with matched table names. In this way, the hint would not be missed by Flink optimizer until it needs the hint in  LookupJoinRules.

LookupJoinRules would only check whether the dimension table scan has  'use_hash' hint. If yes, it would require the input must have hash distribution on join keys.

Compared with the previous solution, this solution has two advantages:

  1. It does not need Calcite version upgrade, see
    Jira
    serverASF JIRA
    serverId5aa69414-a9e9-3523-82ec-879b028fb15b
    keyCALCITE-4967
  2. It does not need code refactor to ensure the hint would not be missed by Flink optimizer.

However, this method is a bit hacky conceptually because whether to enable Hash is the attribute of the lookup Join instead of dimension tableScan.

Anyway, the difference between the two solution is only about the internal implementation and has no impact on the user.

Reference

[1] Oracle Hash hint syntax

[2] Spark Hash hint syntax

[3] SQL Server Hash Keyword syntax