This page documents Correlation Optimizer. It was originally introduced by HIVE-2206 and based on the idea of YSmart [1].
1. Overview
In Hadoop environments, an SQL query submitted to Hive will be evaluated in distributed systems. Thus, after generating a query operator tree representing the submitted SQL query, Hive needs to determine what operations can be executed in a task which will be evalauted in a single node. Also, since a MapReduce job can shuffle data data once, Hive also needs to cut the tree to multiple MapReduce jobs. It is important to cut an operator tree to multiple MapReduce in a good way, so the generated plan can evaluate the query efficiently.
When generating an operator tree for a given SQL query, Hive identifies when to shuffle the data through operations which may need to shuffle data. For example, a JOIN
operation may need to shuffle the input data if input tables have not been distributed by join columns. However, in a complex query, it is possible that the input data of an operation which may need to shuffle the input data has already been partitioned in the desired way. For example, it is possible we can have a query like SELECT t1.key, sum(value) FROM t1 JOIN t2 ON (t1.key = t2.key) GROUP BY t1.key
. In this example, both JOIN
operation and GROUP BY
operation may need to shuffle the input data. However, because the output of JOIN
operation is the input of GROUP BY
operation and it has been already partitioned by t1.key
, we do not need to shuffle the data for GROUP BY
operation. However, Hive is not aware this correlation between JOIN
operation and GROUP BY
operation and thus it will generate two separate MapReduce jobs to evaluate this query. Basically, we unnecessarily shuffle the data for GROUP BY
operation. In a more complex query, correlation-unaware query planning can generate a very inefficient execution plan and result in poor performance.
Before we integrating Correlation Optimizer into Hive, Hive has ReduceSink Deduplication Optimizer which can figure out if we need to shuffle data for chained operators. However, to support more complex operator trees, we need a more general-purpose optimizer and a mechanism to correctly execute optimized plan. Thus, we have designed and implemented Correlation Optimizer and two operators for evaluating optimized plans. It is worth noting that it is better to use ReduceSink Deduplication Optimizer to handle simple cases first and then use Correlation Optimizer to handle more complex cases.
2. Examples
At first, let's take a look at three examples. For every query, we show the original operator tree generated by Hive and the optimized operator tree. To be concise, we only show the following operators, which are FileSinkOperator (FS)
, GroupByOperator (AGG)
, HashTableSinkOperator (HT)
, JoinOperator (JOIN)
, MapJoinOperator (MJ)
, and ReduceSinkOperator (RS)
. Also, in every query, we add comments (e.g. /*JOIN1*/) to indicate the node in the operator tree that an operation belongs to.
2.1 Example 1
SELECT tmp1.key, count(*) FROM (SELECT key, avg(value) AS avg FROM t1 GROUP BY /*AGG1*/ key) tmp1 JOIN /*JOIN1*/ t1 ON (tmp1.key = t2.key) WHERE t1.value > tmp1.avg GROUP BY /*AGG2*/ tmp1.key;
The original operator tree generated by Hive is shown below.
Figure 1: The original operator tree of Example 1 generated by Hive
This plan uses three MapReduce jobs to evaluate this query. However, AGG1
, JOIN1
, and AGG2
all require the column key
to be the partitioning column for shuffling the data. Thus, we do not need to shuffle the data in the same way three times. We only need to shuffle the data once, and thus a single MapReduce job is needed. The optimized operator tree is shown below.
Figure 2: The optimized operator tree of Example 1
Since the input table of AGG1
and the left table of JOIN1
are both t1
, when we use a single MapReduce job to evaluate this query, Hive only needs to scan t1
once. While, in the original plan, t1
is used in two MapReduce jobs, and thus it is scanned twice.
2.2 Example 2
SELECT tmp1.key, count(*) FROM t1 JOIN /*JOIN1*/ (SELECT key, avg(value) AS avg FROM t1 GROUP BY /*AGG1*/ key) tmp1 ON (t1.key = tmp1.key) JOIN /*JOIN1*/ t2 ON (tmp1.key = t2.key) WHERE t2.value > tmp1.avg GROUP BY /*AGG2*/ t1.key;
The original operator tree generated by Hive is shown below.
Figure 3: The original operator tree of Example 2 generated by Hive
This example is similar to Example 1. The optimized operator tree only needs a single MapReduce job, which is shown below.
Figure 4: The optimized operator tree of Example 2
2.3 Example 3
SELECT count(distinct ws1.ws_order_number) as order_count, sum(ws1.ws_ext_ship_cost) as total_shipping_cost, sum(ws1.ws_net_profit) as total_net_profit FROM web_sales ws1 JOIN /*MJ1*/ customer_address ca ON (ws1.ws_ship_addr_sk = ca.ca_address_sk) JOIN /*MJ2*/ web_site s ON (ws1.ws_web_site_sk = s.web_site_sk) JOIN /*MJ3*/ date_dim d ON (ws1.ws_ship_date_sk = d.d_date_sk) LEFT SEMI JOIN /*JOIN4*/ (SELECT ws2.ws_order_number as ws_order_number FROM web_sales ws2 JOIN /*JOIN1*/ web_sales ws3 ON (ws2.ws_order_number = ws3.ws_order_number) WHERE ws2.ws_warehouse_sk <> ws3.ws_warehouse_sk) ws_wh1 ON (ws1.ws_order_number = ws_wh1.ws_order_number) LEFT SEMI JOIN /*JOIN4*/ (SELECT wr_order_number FROM web_returns wr JOIN /*JOIN3*/ (SELECT ws4.ws_order_number as ws_order_number FROM web_sales ws4 JOIN /*JOIN2*/ web_sales ws5 ON (ws4.ws_order_number = ws5.ws_order_number) WHERE ws4.ws_warehouse_sk <> ws5.ws_warehouse_sk) ws_wh2 ON (wr.wr_order_number = ws_wh2.ws_order_number)) tmp1 ON (ws1.ws_order_number = tmp1.wr_order_number) WHERE d.d_date >= '2001-05-01' and d.d_date <= '2001-06-30' and ca.ca_state = 'NC' and s.web_company_name = 'pri';
The original operator tree generated by Hive is shown below.
Figure 5: The original operator tree of Example 3 generated by Hive
In this complex query, we will first have several MapJoins (MJ1
, MJ2
, and MJ3
) which can be evaluated in the same Map phase. Since JOIN1
, JOIN2
, JOIN3
, and JOIN4
use the same column as the join key, we can use a single MapReduce job to evaluate all operators before AGG1
. The second MapReduce job will generate the final results. The optimized operator tree is shown below.
Figure 6: The optimized operator tree of Example 3
3. Intra-query Correlations
In Hive, a submitted SQL query needs to be evaluated in a distributed system. When evaluating a query, data may need to shuffled sometimes. Based on the nature of different data operations, operators in Hive can be divided to two categories.
- Operators which do not require data shuffling. Examples are
TableScanOperator
,SelectOperator
andFilterOperator
. - Operators which require data shuffling. Examples are
GroupByOperator
andJoinOperator
.
For an operator requiring data shuffling, Hive will add one or multiple ReduceSinkOperators
as parents of this operator (the number of ReduceSinkOperators
depends on the number of inputs of the operator requiring data shuffling). Those ReduceSinkOperators
form the boundary between the Map phase and Reduce phase. Then, Hive will cut the operator tree to multiple pieces (MapReduce tasks) and each piece can be executed in a MapReduce job.
For a complex query, it is possible that a input table is used by multiple MapReduce tasks. In this case, this table will be loaded multiple times when the original operator tree is used. Also, when generating those ReduceSinkOperators
, Hive does not consider if the corresponding operator requiring data shuffling really needs a re-partitioned input data. For example, in the original operator tree of Example 1 (Figure 1), AGG1
, JOIN1
, and AGG2
require the data been shuffled in the same way because all of them require the column key
to be the partitioning column in their corresponding ReduceSinkOperators
. But, Hive is not aware this correlation between AGG1
, JOIN1
, and AGG2
, and still generates three MapReduce tasks.
Correlation Optimizer aims to exploit two intra-qeury correlations mentioned above.
- Input Correlation: A input table is used by multiple MapReduce tasks in the original operator tree.
- Job Flow Correlation: Two dependent MapReduce tasks shuffle the data in the same way.
4. Correlation Detection
At the optimization side, Correlation Optimizer is located in the class of CorrelationOptimizer
and it is a part of the package of org.apache.hadoop.hive.ql.optimizer.correlation
. It works on the operator tree before this tree is cut to multiple MapReduce tasks. This optimizer detects correlations and transforms the operator tree accordingly. In this section, we first go through the part of correlation detection. In the next section, we will introduce how an operator tree is transformed based on detected correlations.
To detect correlations, we start to walk the tree from the FileSinkOperator
(using DefaultGraphWalker
). We stop by at every ReduceSinkOperator
. Then, from this ReduceSinkOperator
and its peer ReduceSinkOperators (in the case of handling a JoinOperator
), we start to find correlated ReduceSinkOperators
along the upstream direction (the direction of parent operators) in a layer by layer way. These ReduceSinkOperator
which the search starts from are called topReduceSinkOperators
. The search from topReduceSinkOperators
will return all ReduceSinkOperators
at the lowest layers we can reach as a list called bottomReduceSinkOperators
. Finally, the optimizer will evaluate if we have found a sub-tree with correlations by comparing topReduceSinkOperators
and bottomReduceSinkOperators
.If topReduceSinkOperators
and bottomReduceSinkOperators
are not the same, we consider that we have found job flow correlations. If we found correlations, we will mark those ReduceSinkOperator
belonging to the sub-tree with correlations, so the tree walker will not visit these ReduceSinkOperators
again. Finally, the optimizer continues to walk the tree. It is worth noting that if hive.auto.convert.join=true
, we will first check if any JoinOperator
will be automatically converted to MapJoinOperator
later by CommonJoinResolver
. Then in the process of correlation detection, we will stop searching a branch if we reach a such kind of JoinOperator
.
For example, in Figure 1 (we also show it below), the process of correlation detection is described as follows.
The tree walker visits
RS4
. We settopReduceSinkOperators=[RS4]
.- From
RS4
, we track sorting columns and partitioning columns ofRS4
backward until we reachRS2
(becausetmp1.key
is from the left table ofJOIN1
).- Check if
RS4
andRS2
are using the same sorting columns, sorting orders, and same partitioning columns. Also, we check ifRS4
andRS2
do not have any conflict on the number of reducers. In this example, all of these checks pass. - Because
RS4
andRS2
are correlated and the child ofRS2
is a JoinOperator, we analyze if we can considerRS3
as a correlatedReduceSinkOperator
ofRS4
. In this example,JOIN1
is an inner join operation. So,RS4
andRS3
are also correlated. Because both parents of theJOIN1
are correlatedReduceSinkOperators
, we can continue to searchReduceSinkOperators
from bothRS2
andRS3
.
- Check if
- From
RS2
, we track sorting columns and partitioning columns ofRS2
backward until we reachRS1
.- Check if
RS2
andRS1
are using the same sorting columns, sorting orders, and same partitioning columns. Also, we check ifRS2
andRS1
do not have any conflict on the number of reducers. In this example, all of these checks pass. So,RS2
andRS1
are correlated.
- Check if
- Because there is no
ReduceSinkOperator
we can track backward fromRS1
, we addRS1
tobottomReduceSinkOperators
. - Because there is no
ReduceSinkOperator
we can track backward fromRS3
, we addRS3
tobottomReduceSinkOperators
. We have
topReduceSinkOperators=[RS4]
andbottomReduceSinkOperators=[RS1, RS3]
. BecausetopReduceSinkOperators
andbottomReduceSinkOperators
are not the same, we have found a sub-tree with correlations. This sub-tree starts fromRS1
andRS3
, and allReduceSinkOperators
in this sub-tree areRS1
,RS2
,RS3
, andRS4
.- There is no
ReduceSinkOperator
which needs to be visited. The process of correlation detection stops.
In the process of searching correlated ReduceSinkOperators
, if the child of a correlated ReduceSinkOperator
is a JoinOperator
, we analyze if other ReduceSinkOperators
of this JoinOperator
can be also considered as correlated ReduceSinkOperators
in the following way. In a JoinOperator
, there are multiple join conditions (joinConds
). For a join condition, it has a left table and a right table. For a correlated ReduceSinkOperator
, if it is the left table of a join condition, we consider that the ReduceSinkOperator
corresponding to the right table is also correlated when the join type is either INNER_JOIN
, LEFT_OUTER_JOIN
, or LEFT_SEMI_JOIN
. If a correlated ReduceSinkOperator
is the right table of a join condition, we consider that the ReduceSinkOperator
corresponding to the left table is also correlated when the join type is either INNER_JOIN
, or RIGHT_OUTER_JOIN
. Because a JoinOperator
can have multiple join conditions, we recursively search all join conditions until we either have searched all join conditions or there is no more correlated ReduceSinkOperators
. After this analysis, if all parent ReduceSinkOperators
of the JoinOperator
are correlated, we will continue to search ReduceSinkOperators
at this branch. Otherwise, we will stop searching this branch and consider none of parent ReduceSinkOperators
of the JoinOperator
is correlated.
Right now, the process of correlation detection has a few limitations. We should improve these in our future work.
- Conditions on checking if two
ReduceSinkOperators
are correlated are very restrict. TwoReduceSinkOperators
are considered correlated if they have the same sorting columns, sorting orders, partitioning columns, and they do not have conflict on the number of reducers. - Input correlations are not explicitly detected. Right now, we only explicitly detect job flow correlations. If a sub-tree has job flow correlations, because we use a single MapReduce job to evaluate this sub-tree, input correlations in this sub-tree can be automatically exploited. However, there are cases which only have input correlations. Right now, these cases are not optimized.
- If the input operator tree has multiple
FileSinkOperators
, we do not optimize this tree. - If the input operator tree already has
MapJoinOperator
, we do not optimize this tree. - In the process of searching
ReduceSinkOperators
, if we find aGroupByOperator
with grouping sets or aPTFOperator
in a branch, we stop searching this branch.
5. Operator Tree Transformation
6. Executing Optimized Operator Tree in the Reduce Phase
Currently, blocking operators in the reduce phase operator tree share the same keys. Other cases will be supported in future work.
6.1 Executing Operator Tree with Same Keys
7. Related Jiras
The umbrella jira is HIVE-3667.
7.1 Resolved Jiras
7.2 Unresolved Jiras
8. References
- Rubao Lee, Tian Luo, Yin Huai, Fusheng Wang, Yongqiang He, Xiaodong Zhang. YSmart: Yet another SQL-to-MapReduce Translator, ICDCS, 2011