Join Optimization
Table of Contents |
---|
For a general discussion of Hive joins including syntax, examples, and restrictions, see the Joins wiki doc.
...
Star Schema Example
No Format |
---|
Select count(*) cnt
From store_sales ss
join household_demographics hd on (ss.ss_hdemo_sk = hd.hd_demo_sk)
join time_dim t on (ss.ss_sold_time_sk = t.t_time_sk)
join store s on (s.s_store_sk = ss.ss_store_sk)
Where
t.t_hour = 8
t.t_minute >= 30
hd.hd_dep_count = 2
order by cnt;
|
...
Hive supports MAPJOINs, which are well suited for this scenario – at least for dimensions small enough to fit in memory. Before release 0.11, a MAPJOIN could be invoked either through an optimizer hint:
No Format |
---|
select /*+ MAPJOIN(time_dim) */ count(*) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
|
or via auto join conversion:
No Format |
---|
set hive.auto.convert.join=true;
select count(*) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
|
The default value for hive.auto.convert.join was false in Hive 0.10.0. Hive 0.11.0 changed the default to true (HIVE-3297). Note that hive-default.xml.template incorrectly gives the default as false in Hive 0.11.0 through 0.13.1.
MAPJOINs are processed by loading the smaller table into an in-memory hash map and matching keys with the larger table as they are streamed through. The prior implementation has this division of labor:
...
The following query will produce two separate map-only jobs when executed:
No Format |
---|
select /*+ MAPJOIN(time_dim, date_dim) */ count(*) from
store_sales
join time_dim on (ss_sold_time_sk = t_time_sk)
join date_dim on (ss_sold_date_sk = d_date_sk)
where t_hour = 8 and d_year = 2002
|
...
- Merge M*-MR patterns into a single MR.
- Merge MJ->MJ into a single MJ when possible.
- Merge MJ* patterns into a single Map stage as a chain of MJ operators. (Not yet implemented.)
If hive.auto.convert.join
is set to true the optimizer not only converts joins to mapjoins but also merges MJ* patterns as much as possible.
...
When auto join is enabled, there is no longer a need to provide the map-join hints in the query. The auto join option can be enabled with two configuration parameters:
No Format |
---|
set hive.auto.convert.join.noconditionaltask = true; set hive.auto.convert.join.noconditionaltask.size = 1000010000000; |
The default for hive.auto.convert.join.noconditionaltask
is false true which means auto conversion is disabled.enabled. (Originally the default was false – see HIVE-3784 – but it was changed to true by HIVE-4146 before Hive 0.11.0 was released.)
The size configuration enables the user to control what size table can fit in memory. This value represents the sum of the sizes of tables that can be converted to hashmaps that fit in memory. Currently, n-1 tables of the join have to fit in memory for the map-join optimization to take effect. There is no check to see if the table is a compressed one or not and what the potential size of the table can be. The effect of this assumption on the results is discussed in the next section.
For example, the previous query just becomes:
No Format |
---|
select count(*) from
store_sales
join time_dim on (ss_sold_time_sk = t_time_sk)
join date_dim on (ss_sold_date_sk = d_date_sk)
where t_hour = 8 and d_year = 2002
|
...
Auto join conversion also affects the sort-merge-bucket joins.
Info | ||
---|---|---|
| ||
Hive 0.13.0 introduced For conditional joins, if the input stream from a small alias can be directly applied to the join operator without filtering or projection, then it does not need to be pre-staged in the distributed cache via a MapReduce local task. Setting |
Current Optimization
- Group as many MJ operators as possible into one MJ.
...
The following configuration settings enable the conversion of an SMB to a map-join SMB:
No Format |
---|
set hive.auto.convert.sortmerge.join=true; set hive.optimize.bucketmapjoin = true; set hive.optimize.bucketmapjoin.sortedmerge = true; set hive.auto.convert.sortmerge.join.noconditionaltask=true; |
There is an option to set the big table selection policy using the following configuration:
No Format |
---|
set hive.auto.convert.sortmerge.join.bigtable.selection.policy
= org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;
|
...
The available selection policies are:
No Format |
---|
org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ (default)
org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ
org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ
|
The names describe their uses. This is especially useful for the fact-fact join (query 82 in the TPC DS benchmark).
SMB Join across Tables with Different Keys
If the tables have differing number of keys, for example Table A has 2 SORT columns and Table B has 1 SORT column, then you might get an index out of bounds exception.
The following query results in an index out of bounds exception because emp_person let us say for example has 1 sort column while emp_pay_history has 2 sort columns.
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT p.*, py.*
FROM emp_person p INNER JOIN emp_pay_history py
ON p.empid = py.empid |
This works fine.
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT p.*, py.*
FROM emp_pay_history py INNER JOIN emp_person p
ON p.empid = py.empid
|
Generate Hash Tables on the Task Side
...