This page documents Correlation Optimizer. It was originally introduced by HIVE-2206 and based on the idea of YSmart [1]. To turn on this optimizer, you can use ...
Code Block | ||
---|---|---|
| ||
set hive.optimize.correlation=true; |
1. Overview
In Hadoop environments, an SQL query submitted to Hive will be evaluated in distributed systems. Thus, after generating a query operator tree representing the submitted SQL query, Hive needs to determine what operations can be executed in a task which will be evalauted in a single node. Also, since a MapReduce job can shuffle data data once, Hive also needs to cut the tree to multiple MapReduce jobs. It is important to cut an operator tree to multiple MapReduce in a good way, so the generated plan can evaluate the query efficiently.
...
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT tmp1.key, count(*)
FROM (SELECT key, avg(value) AS avg
FROM t1
GROUP BY /*AGG1*/ key) tmp1
JOIN /*JOIN1*/ t1 ON (tmp1.key = t2.key)
WHERE t1.value > tmp1.avg
GROUP BY /*AGG2*/ tmp1.key;
|
...
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT tmp1.key, count(*)
FROM t1
JOIN /*JOIN1*/ (SELECT key, avg(value) AS avg
FROM t1
GROUP BY /*AGG1*/ key) tmp1 ON (t1.key = tmp1.key)
JOIN /*JOIN1*/ t2 ON (tmp1.key = t2.key)
WHERE t2.value > tmp1.avg
GROUP BY /*AGG2*/ t1.key;
|
...
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT count(distinct ws1.ws_order_number) as order_count,
sum(ws1.ws_ext_ship_cost) as total_shipping_cost,
sum(ws1.ws_net_profit) as total_net_profit
FROM web_sales ws1
JOIN /*MJ1*/ customer_address ca ON (ws1.ws_ship_addr_sk = ca.ca_address_sk)
JOIN /*MJ2*/ web_site s ON (ws1.ws_web_site_sk = s.web_site_sk)
JOIN /*MJ3*/ date_dim d ON (ws1.ws_ship_date_sk = d.d_date_sk)
LEFT SEMI JOIN /*JOIN4*/ (SELECT ws2.ws_order_number as ws_order_number
FROM web_sales ws2 JOIN /*JOIN1*/ web_sales ws3
ON (ws2.ws_order_number = ws3.ws_order_number)
WHERE ws2.ws_warehouse_sk <> ws3.ws_warehouse_sk) ws_wh1
ON (ws1.ws_order_number = ws_wh1.ws_order_number)
LEFT SEMI JOIN /*JOIN4*/ (SELECT wr_order_number
FROM web_returns wr
JOIN /*JOIN3*/ (SELECT ws4.ws_order_number as ws_order_number
FROM web_sales ws4 JOIN /*JOIN2*/ web_sales ws5
ON (ws4.ws_order_number = ws5.ws_order_number)
WHERE ws4.ws_warehouse_sk <> ws5.ws_warehouse_sk) ws_wh2
ON (wr.wr_order_number = ws_wh2.ws_order_number)) tmp1
ON (ws1.ws_order_number = tmp1.wr_order_number)
WHERE d.d_date >= '2001-05-01' and
d.d_date <= '2001-06-30' and
ca.ca_state = 'NC' and
s.web_company_name = 'pri';
|
...
- Input Correlation: A input table is used by multiple MapReduce tasks in the original operator tree.
- Job Flow Correlation: Two dependent MapReduce tasks shuffle the data in the same way.
4. Correlation Detection
At the optimization side, Correlation Optimizer is located in the class of CorrelationOptimizer
and it is a part of the package of org.apache.hadoop.hive.ql.optimizer.correlation
. It works on the operator tree before this tree is cut to multiple MapReduce tasks. This optimizer detects correlations and transforms the operator tree accordingly. In this section, we first go through the part of correlation detection. In the next section, we will introduce how an operator tree is transformed based on detected correlations.
To detect correlations, we start to walk the tree from the FileSinkOperator
(using DefaultGraphWalker
). We stop by at every ReduceSinkOperator
. Then, from this ReduceSinkOperator
and its peer ReduceSinkOperators (in the case of handling a JoinOperator
), we start to find correlated ReduceSinkOperators
along the upstream direction (the direction of parent operators) in a layer by layer way. These ReduceSinkOperator
which the search starts from are called topReduceSinkOperators
. The search from topReduceSinkOperators
will return all ReduceSinkOperators
at the lowest layers we can reach as a list called bottomReduceSinkOperators
. Finally, the optimizer will evaluate if we have found a sub-tree with correlations by comparing topReduceSinkOperators
and bottomReduceSinkOperators
.If topReduceSinkOperators
and bottomReduceSinkOperators
are not the same, we consider that we have found job flow correlations. If we found correlations, we will mark those ReduceSinkOperator
belonging to the sub-tree with correlations, so the tree walker will not visit these ReduceSinkOperators
again. Finally, the optimizer continues to walk the tree. It is worth noting that if hive.auto.convert.join=true
, we will first check if any JoinOperator
will be automatically converted to MapJoinOperator
later by CommonJoinResolver
. Then in the process of correlation detection, we will stop searching a branch if we reach a such kind of JoinOperator
.
For example, in Figure 1 (we also show it below), the process of correlation detection is described as follows.
- The tree walker visits
RS4
. We settopReduceSinkOperators=[RS4]
. - From
RS4
, we track sorting columns and partitioning columns ofRS4
backward until we reachRS2
(becausetmp1.key
is from the left table ofJOIN1
).- Check if
RS4
andRS2
are using the same sorting columns, sorting orders, and same partitioning columns. Also, we check ifRS4
andRS2
do not have any conflict on the number of reducers. In this example, all of these checks pass. - Because
RS4
andRS2
are correlated and the child ofRS2
is a JoinOperator, we analyze if we can considerRS3
as a correlatedReduceSinkOperator
ofRS4
. In this example,JOIN1
is an inner join operation. So,RS4
andRS3
are also correlated. Because both parents of theJOIN1
are correlatedReduceSinkOperators
, we can continue to searchReduceSinkOperators
from bothRS2
andRS3
.
- Check if
- From
RS2
, we track sorting columns and partitioning columns ofRS2
backward until we reachRS1
.- Check if
RS2
andRS1
are using the same sorting columns, sorting orders, and same partitioning columns. Also, we check ifRS2
andRS1
do not have any conflict on the number of reducers. In this example, all of these checks pass. So,RS2
andRS1
are correlated.
- Check if
- Because there is no
ReduceSinkOperator
we can track backward fromRS1
, we addRS1
tobottomReduceSinkOperators
. - Because there is no
ReduceSinkOperator
we can track backward fromRS3
, we addRS3
tobottomReduceSinkOperators
. - We have
topReduceSinkOperators=[RS4]
andbottomReduceSinkOperators=[RS1, RS3]
. BecausetopReduceSinkOperators
andbottomReduceSinkOperators
are not the same, we have found a sub-tree with correlations. This sub-tree starts fromRS1
andRS3
, and allReduceSinkOperators
in this sub-tree areRS1
,RS2
,RS3
, andRS4
. - There is no
ReduceSinkOperator
which needs to be visited. The process of correlation detection stops.
In the process of searching correlated ReduceSinkOperators
, if the child of a correlated ReduceSinkOperator
is a JoinOperator
, we analyze if other ReduceSinkOperators
of this JoinOperator
can be also considered as correlated ReduceSinkOperators
in the following way. In a JoinOperator
, there are multiple join conditions (joinConds
). For a join condition, it has a left table and a right table. For a correlated ReduceSinkOperator
, if it is the left table of a join condition, we consider that the ReduceSinkOperator
corresponding to the right table is also correlated when the join type is either INNER_JOIN
, LEFT_OUTER_JOIN
, or LEFT_SEMI_JOIN
. If a correlated ReduceSinkOperator
is the right table of a join condition, we consider that the ReduceSinkOperator
corresponding to the left table is also correlated when the join type is either INNER_JOIN
, or RIGHT_OUTER_JOIN
. Because a JoinOperator
can have multiple join conditions, we recursively search all join conditions until we either have searched all join conditions or there is no more correlated ReduceSinkOperators
. After this analysis, if all parent ReduceSinkOperators
of the JoinOperator
are correlated, we will continue to search ReduceSinkOperators
at this branch. Otherwise, we will stop searching this branch and consider none of parent ReduceSinkOperators
of the JoinOperator
is correlated.
Right now, the process of correlation detection has a few limitations. We should improve these in our future work.
- Conditions on checking if two
ReduceSinkOperators
are correlated are very restrict. TwoReduceSinkOperators
are considered correlated if they have the same sorting columns, sorting orders, partitioning columns, and they do not have conflict on the number of reducers. - Input correlations are not explicitly detected. Right now, we only explicitly detect job flow correlations. If a sub-tree has job flow correlations, because we use a single MapReduce job to evaluate this sub-tree, input correlations in this sub-tree can be automatically exploited. However, there are cases which only have input correlations. Right now, these cases are not optimized.
- If the input operator tree has multiple
FileSinkOperators
, we do not optimize this tree. - If the input operator tree already has
MapJoinOperator
, we do not optimize this tree. - In the process of searching
ReduceSinkOperators
, if we find aGroupByOperator
with grouping sets or aPTFOperator
in a branch, we stop searching this branch.
5. Operator Tree Transformation
6. Executing Optimized Operator Tree in the Reduce Phase
Currently, blocking operators in the reduce phase operator tree share the same keys. Other cases will be supported in future work.
...
In Hive, every query has one or multiple terminal operators which are the last operators in the operator tree. Those terminal operators are called FileSinkOperatos. To give an easy explanation, if an operator A is on another operator B's path to a FileSinkOperato, A is the downstream of B and B is the upstream of A.
For a given operator tree like the one shown in Figure 1, the Correlation Optimizer starts to visit operators in the tree from those FileSinkOperatos in a depth-first way. The tree walker stops at every ReduceSinkOperator. Then, a correlation detector starts to find a correlation from this ReduceSinkOperator and its siblings by finding the furthest correlated upstream ReduceSinkOperators in a recursive way. If we can find any correlated upstream ReduceSinkOperator, we find a correlation. Currently, there are three conditions to determine if a upstream ReduceSinkOperator and an downstream ReduceSinkOperator are correlated, which are
- emitted rows from these two ReduceSinkOperators are sorted in the same way;
- emitted rows from these two ReduceSinkOperators are partitioned in the same way; and
- these ReduceSinkOperators do not have any conflict on the number reducers.
Interested readers may refer to our implementation for details.
During the correlation detection, a JoinOperator or a UnionOperator can introduce branches to the searching path. For a JoinOperator, its parents are all ReduceSinkOperators. When the detector reaches a JoinOperator, it will check if all parents of this JoinOperator are correlated to the downstream ReduceSinkOperator. Because a JoinOperator contains one or multiple 2-way Join operations, for a ReduceSinkOperator, we can determine if another ReduceSinkOperator appearing in the same Join operation is correlated based on the Join type and positions of these ReduceSinkOperators in the Join operation with the following two rules.
- If a ReduceSinkOperator represents the left table of a INNER JOIN, a LEFT OUTER JOIN, or a LEFT SEMI JOIN, the ReduceSinkOperator representing the right table is also considered correlated; and
- If a ReduceSinkOperator represents the right table of a INNER JOIN, or a RIGHT OUTER JOIN, the ReduceSinkOperator representing the left table is also considered correlated.
With these two rules, we start to analyze those parent ReduceSinkOperators of the JoinOperator from every ReduceSinkOperator which has columns appearing in the join clause and then we can find all correlated ReduceSinkOperators recursively. If we can find that all parent ReduceSinkOperators are correlated from every ReduceSinkOperator which has columns appearing in the join clause, we will continue the correlation detection on this branch. Otherwise, we will determine that none of ReduceSinkOperator for the JoinOperator is correlated and stop the correlation detection on this branch.
For a UnionOperator, none of its parents will be a ReduceSinkOperator. So, we check if we can find correlated ReduceSinkOperators for every parent branch of this UnionOperator. If any branch does not have a ReduceSinkOperator, we will determine that we do not find any correlated ReduceSinkOperator at parent branches of this UnionOperator.
During the process of correlation detection, it is possible that the detector can visit a JoinOperator which will be converted to a Map Join later. In this case, the detector stops searching the branch containing this Map Join. For example,
in Figure 5, the detector knows that MJ1, MJ2, and MJ3 will be converted to Map Joins.
5. Operator Tree Transformation
In a correlation, there are two kinds of ReduceSinkOperators. The first kinds of ReduceSinkOperators are at the bottom layer of a query operator tree which are needed to emit rows to the shuffling phase. For example, in Figure 1, RS1 and RS3 are bottom layer ReduceSinkOperators. The second kinds of ReduceSinkOperators are unnecessary ones which can be removed from the optimized operator tree. For example, in Figure 1, RS2 and RS4 are unnecessary ReduceSinkOperators. Because the input rows of the Reduce phase may need to be forwarded to different operators and those input rows are coming from a single stream, we add a new operator called DemuxOperator to dispatch input rows of the Reduce phase to corresponding operators. In the operator tree transformation, we first connect children of those bottom layer ReduceSinkOperators to the DemuxOperator and reassign tags of those bottom layer ReduceSinkOperators (the DemuxOperator is the only child of those bottom layer ReduceSinkOperators). In the DemuxOperator, we record two mappings. The first one is called newTagToOldTag which maps those new tags assigned to those bottom layer ReduceSinkOperators to their original tags. Those original tags are needed to make JoinOperator work correctly. The second mapping is called newTagToChildIndex which maps those new tags to the children indexes. With this mapping, the DemuxOperator can know the correct operator that a row needs to be forwarded based on the tag of this row. The second step of operator tree transformation is to remove those unnecessary ReduceSinkOperators. To make the operator tree in the Reduce phase work correctly, we add a new operator called MuxOperator to the original place of those unnecessary ReduceSinkOperators. It is worth noting that if an operator has multiple unnecessary ReduceSinkOperators as its parents, we only add a single MuxOperator.
6. Executing Optimized Operator Tree in the Reduce Phase
In the Reduce phase, the ExecReducer will forward all reduce input rows to DemuxOperator first. Currently, blocking operators in the reduce phase operator tree share the same keys. Other cases will be supported in future work. Then, DemuxOperator will forward rows to their corresponding operators. Because a Reduce plan optimized Correlation Optimizer can be a tree structure, we need to coordinate operators in this tree to make the Reduce phase work correctly. This coordination mechanism is implemented in ExecDriver, DemuxOperator and MuxOperator.
When a new row is sent to the ExecDriver, it checks if it needs to start a new group of rows by checking values of those key columns. If a new group of rows is coming, it first invokes DemuxOperator.endGroup. Then, the DemuxOperator will ask its children to process their buffered rows and propagate the endGroup call to the operator tree. Finally, DemuxOperator will propagate processGroup call to the operator tree. Usually, the implementation of processGroup in an operator only propagates this call to its children. MuxOperator is the one that overrides processGroup. When a MuxOperator gets the processGroup call, it check if all its parent operators have invoked this call. If so, it will ask its child to generate results and propagate processGroup to its child. Once the processGroup has been propagated to all operators, the DemuxOperator.endGroup will return and ExecDriver will propagate startGroup to the operator tree.
For every row sent to the ExecDriver, it also has a tag assigned by a corresponding RediceSinkOperator at the Map phase. In a row group (rows having the same key), rows are also sorted by their tags. When the DemuxOperator sees a new tag coming, it knows all child operators associated with tags smaller than this new coming tag will not have any input within the current row group. Thus, it can call endGroup and processGroup of those operators earlier. With this logic, within a row group, the input rows of every operator in the operator tree are also ordered by tags, which is required by JoinOperator. This logic also makes rows in the buffer of an operator be emitted as quickly as possible, which avoids unnecessary memory footprint contributed from buffering unnecessary rows.
7. Related Jiras
The umbrella jira is HIVE-3667.
...