Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Rewrite correlation detection

...

Anchor
example1
example1

Code Block
sql
sql

SELECT tmp1.key, count(*)
FROM (SELECT key, avg(value) AS avg
      FROM t1
      GROUP BY /*AGG1*/ key) tmp1
JOIN /*JOIN1*/ t1 ON (tmp1.key = t2.key)
WHERE t1.value > tmp1.avg
GROUP BY /*AGG2*/ tmp1.key;

...

Anchor
example2
example2

Code Block
sql
sql

SELECT tmp1.key, count(*)
FROM t1
JOIN /*JOIN1*/ (SELECT key, avg(value) AS avg
                FROM t1
                GROUP BY /*AGG1*/ key) tmp1 ON (t1.key = tmp1.key)
JOIN /*JOIN1*/ t2 ON (tmp1.key = t2.key)
WHERE t2.value > tmp1.avg
GROUP BY /*AGG2*/ t1.key;

...

Anchor
example3
example3

Code Block
sql
sql

SELECT count(distinct ws1.ws_order_number) as order_count,
       sum(ws1.ws_ext_ship_cost) as total_shipping_cost,
       sum(ws1.ws_net_profit) as total_net_profit
FROM web_sales ws1
JOIN /*MJ1*/ customer_address ca ON (ws1.ws_ship_addr_sk = ca.ca_address_sk)
JOIN /*MJ2*/ web_site s ON (ws1.ws_web_site_sk = s.web_site_sk)
JOIN /*MJ3*/ date_dim d ON (ws1.ws_ship_date_sk = d.d_date_sk)
LEFT SEMI JOIN /*JOIN4*/ (SELECT ws2.ws_order_number as ws_order_number
               	          FROM web_sales ws2 JOIN /*JOIN1*/ web_sales ws3
               	          ON (ws2.ws_order_number = ws3.ws_order_number)
               	          WHERE ws2.ws_warehouse_sk <> ws3.ws_warehouse_sk) ws_wh1
ON (ws1.ws_order_number = ws_wh1.ws_order_number)
LEFT SEMI JOIN /*JOIN4*/ (SELECT wr_order_number
               	          FROM web_returns wr
               	          JOIN /*JOIN3*/ (SELECT ws4.ws_order_number as ws_order_number
                                          FROM web_sales ws4 JOIN /*JOIN2*/ web_sales ws5
                                          ON (ws4.ws_order_number = ws5.ws_order_number)
                                          WHERE ws4.ws_warehouse_sk <> ws5.ws_warehouse_sk) ws_wh2
                          ON (wr.wr_order_number = ws_wh2.ws_order_number)) tmp1
ON (ws1.ws_order_number = tmp1.wr_order_number)
WHERE d.d_date >= '2001-05-01' and
      d.d_date <= '2001-06-30' and
      ca.ca_state = 'NC' and
      s.web_company_name = 'pri';

...

  1. Input Correlation: A input table is used by multiple MapReduce tasks in the original operator tree.
  2. Job Flow Correlation: Two dependent MapReduce tasks shuffle the data in the same way.

4. Correlation Detection

At the optimization side, Correlation Optimizer is located in the class of CorrelationOptimizer and it is a part of the package of org.apache.hadoop.hive.ql.optimizer.correlation. It works on the operator tree before this tree is cut to multiple MapReduce tasks. This optimizer detects correlations and transforms the operator tree accordingly. In this section, we first go through the part of correlation detection. In the next section, we will introduce how an operator tree is transformed based on detected correlations.

To detect correlations, we start to walk the tree from the FileSinkOperator (using DefaultGraphWalker). We stop by at every ReduceSinkOperator. Then, from this ReduceSinkOperator and its peer ReduceSinkOperators (in the case of handling a JoinOperator), we start to find correlated ReduceSinkOperators along the upstream direction (the direction of parent operators) in a layer by layer way. These ReduceSinkOperator which the search starts from are called topReduceSinkOperators. The search from topReduceSinkOperators will return all ReduceSinkOperators at the lowest layers we can reach as a list called bottomReduceSinkOperators. Finally, the optimizer will evaluate if we have found a sub-tree with correlations by comparing topReduceSinkOperators and bottomReduceSinkOperators.If topReduceSinkOperators and bottomReduceSinkOperators are not the same, we consider that we have found job flow correlations. If we found correlations, we will mark those ReduceSinkOperator belonging to the sub-tree with correlations, so the tree walker will not visit these ReduceSinkOperators again. Finally, the optimizer continues to walk the tree. It is worth noting that if hive.auto.convert.join=true, we will first check if any JoinOperator will be automatically converted to MapJoinOperator later by CommonJoinResolver. Then in the process of correlation detection, we will stop searching a branch if we reach a such kind of JoinOperator.

For example, in Figure 1 (we also show it below), the process of correlation detection is described as follows.
Image Removed

  1. The tree walker visits RS4. We set topReduceSinkOperators=[RS4].
  2. From RS4, we track sorting columns and partitioning columns of RS4 backward until we reach RS2 (because tmp1.key is from the left table of JOIN1).
    1. Check if RS4 and RS2 are using the same sorting columns, sorting orders, and same partitioning columns. Also, we check if RS4 and RS2 do not have any conflict on the number of reducers. In this example, all of these checks pass.
    2. Because RS4 and RS2 are correlated and the child of RS2 is a JoinOperator, we analyze if we can consider RS3 as a correlated ReduceSinkOperator of RS4. In this example, JOIN1 is an inner join operation. So, RS4 and RS3 are also correlated. Because both parents of the JOIN1 are correlated ReduceSinkOperators, we can continue to search ReduceSinkOperators from both RS2 and RS3.
  3. From RS2, we track sorting columns and partitioning columns of RS2 backward until we reach RS1.
    1. Check if RS2 and RS1 are using the same sorting columns, sorting orders, and same partitioning columns. Also, we check if RS2 and RS1 do not have any conflict on the number of reducers. In this example, all of these checks pass. So, RS2 and RS1 are correlated.
  4. Because there is no ReduceSinkOperator we can track backward from RS1, we add RS1 to bottomReduceSinkOperators.
  5. Because there is no ReduceSinkOperator we can track backward from RS3, we add RS3 to bottomReduceSinkOperators.
  6. We have topReduceSinkOperators=[RS4] and bottomReduceSinkOperators=[RS1, RS3]. Because topReduceSinkOperators and bottomReduceSinkOperators are not the same, we have found a sub-tree with correlations. This sub-tree starts from RS1 and RS3, and all ReduceSinkOperators in this sub-tree are RS1, RS2, RS3, and RS4.
  7. There is no ReduceSinkOperator which needs to be visited. The process of correlation detection stops.

In the process of searching correlated ReduceSinkOperators, if the child of a correlated ReduceSinkOperator is a JoinOperator, we analyze if other ReduceSinkOperators of this JoinOperator can be also considered as correlated ReduceSinkOperators in the following way. In a JoinOperator, there are multiple join conditions (joinConds). For a join condition, it has a left table and a right table. For a correlated ReduceSinkOperator, if it is the left table of a join condition, we consider that the ReduceSinkOperator corresponding to the right table is also correlated when the join type is either INNER_JOIN, LEFT_OUTER_JOIN, or LEFT_SEMI_JOIN. If a correlated ReduceSinkOperator is the right table of a join condition, we consider that the ReduceSinkOperator corresponding to the left table is also correlated when the join type is either INNER_JOIN, or RIGHT_OUTER_JOIN. Because a JoinOperator can have multiple join conditions, we recursively search all join conditions until we either have searched all join conditions or there is no more correlated ReduceSinkOperators. After this analysis, if all parent ReduceSinkOperators of the JoinOperator are correlated, we will continue to search ReduceSinkOperators at this branch. Otherwise, we will stop searching this branch and consider none of parent ReduceSinkOperators of the JoinOperator is correlated.

Right now, the process of correlation detection has a few limitations. We should improve these in our future work.

...

In Hive, every query has one or multiple terminal operators which are the last operators in the operator tree. Those terminal operators are called FileSinkOperatos. To give an easy explanation, if an operator A is on another operator B's path to a FileSinkOperato, A is the downstream of B and B is the upstream of A.

For a given operator tree like the one shown in  Figure 1, the Correlation Optimizer starts to visit operators in the tree from those FileSinkOperatos in a depth-first way. The tree walker stops at every ReduceSinkOperator. Then, a correlation detector starts to find a correlation from this ReduceSinkOperator and its siblings by finding the furthest correlated upstream ReduceSinkOperators in a recursive way. If we can find any correlated upstream ReduceSinkOperator, we find a correlation. Currently, there are three conditions to determine if a upstream ReduceSinkOperator and an downstream ReduceSinkOperator are correlated, which are

  1. emitted rows from these two ReduceSinkOperators are sorted in the same way;
  2. emitted rows from these two ReduceSinkOperators are partitioned in the same way; and
  3. these ReduceSinkOperators do not have conflict on the number reducers.

Interested readers may refer to our implementation for details.

During the correlation detection, a JoinOperator or a UnionOperator can introduce branches to the searching path. For a JoinOperator, its parents are all ReduceSinkOperators. When the detector reaches a JoinOperator, it will check if all parents of this JoinOperator are correlated to the downstream ReduceSinkOperator. Because a JoinOperator contains one or multiple 2-way Join operations, for a ReduceSinkOperator, we can determine if another ReduceSinkOperator appearing in the same Join operation is correlated based on the Join type and positions of these ReduceSinkOperators in the Join operation with the following two rules.

  1. If a ReduceSinkOperator represents the left table of a INNER JOIN, a LEFT OUTER JOIN, or a LEFT SEMI JOIN, the ReduceSinkOperator representing the right table is also considered correlated; and
  2. If a ReduceSinkOperator represents the right table of a INNER JOIN, or a RIGHT OUTER JOIN, the ReduceSinkOperator representing the left table is also considered correlated.

With these two rules, we start to analyze those parent ReduceSinkOperators of the JoinOperator from every ReduceSinkOperator which has columns appearing in the join clause and then we can find all correlated ReduceSinkOperators recursively. If we can find that all parent ReduceSinkOperators are correlated from every ReduceSinkOperator which has columns appearing in the join clause, we will continue the correlation detection on this branch. Otherwise, we will determine that none of ReduceSinkOperator for the JoinOperator is correlated and stop the correlation detection on this branch.

For a UnionOperator, none of its parents will be a ReduceSinkOperator. So, we check if we can find correlated ReduceSinkOperators for every parent branch of this UnionOperator. If any branch does not have a {\tt RSOp}, we will determine that we do not find any correlated ReduceSinkOperator at parent branches of this UnionOperator.

During the process of correlation detection, it is possible that the detector can visit a JoinOperator which will be converted to a Map Join later. In this case, the detector stops searching the branch containing this Map Join. For example,
in Figure 5, the detector knows that MJ1, MJ2, and MJ3 will be converted to Map Joins

...

.

5. Operator Tree Transformation

...