Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added numbered headings and figure captions

Wiki Markup
This page documents Correlation Optimizer. It was originally introduced by [HIVE-2206|https://issues.apache.org/jira/browse/HIVE-2206] and based on the idea of YSmart \[1\].

1. Overview

In Hadoop environments, an SQL query submitted to Hive will be evaluated in distributed systems. Thus, after generating a query operator tree representing the submitted SQL query, Hive needs to determine what operations can be executed in a task which will be evalauted in a single node. Also, since a MapReduce job can shuffle data data once, Hive also needs to cut the tree to multiple MapReduce jobs. It is important to cut an operator tree to multiple MapReduce in a good way, so the generated plan can evaluate the query efficiently.

...

Before we integrating Correlation Optimizer into Hive, Hive has ReduceSink Deduplication Optimizer which can figure out if we need to shuffle data for chained operators. However, to support more complex operator trees, we need a more general-purpose optimizer and a mechanism to correctly execute optimized plan. Thus, we have designed and implemented Correlation Optimizer and two operators for evaluating optimized plans. It is worth noting that it is better to use ReduceSink Deduplication Optimizer to handle simple cases first and then use Correlation Optimizer to handle more complex cases.

2. Examples

At first, let's take a look at three examples. For every query, we show the original operator tree generated by Hive and the optimized operator tree. To be concise, we only show the following operators, which are FileSinkOperator (FS), GroupByOperator (AGG), HashTableSinkOperator (HT), JoinOperator (JOIN), MapJoinOperator (MJ), and ReduceSinkOperator (RS). Also, in every query, we add comments (e.g. /*JOIN1*/) to indicate the node in the operator tree that an operation belongs to.

...

The original operator tree generated by Hive is shown below.


Anchor
figure1
figure1

Figure 1: The original operator tree of Example 1 generated by Hive

This plan uses three MapReduce jobs to evaluate this query. However, AGG1, JOIN1, and AGG2 all require the column key to be the partitioning column for shuffling the data. Thus, we do not need to shuffle the data in the same way three times. We only need to shuffle the data once, and thus a single MapReduce job is needed. The optimized operator tree is shown below.


Anchor
figure2
figure2

Figure 2: The optimized operator tree of Example 1

Since the input table of AGG1 and the left table of JOIN1 are both t1, when we use a single MapReduce job to evaluate this query, Hive only needs to scan t1 once. While, in the original plan, t1 is used in two MapReduce jobs, and thus it is scanned twice.

...

The original operator tree generated by Hive is shown below.


Anchor
figure3
figure3

Figure 3: The original operator tree of Example 2 generated by Hive

This example is similar to Example 1. The optimized operator tree only needs a single MapReduce job, which is shown below.


Anchor
figure4
figure4

Figure 4: The optimized operator tree of Example 2

Example 3

Code Block
sql
sql
SELECT count(distinct ws1.ws_order_number) as order_count,
             sum(ws1.ws_ext_ship_cost) as total_shipping_cost,
             sum(ws1.ws_net_profit) as total_net_profit
FROM web_sales ws1
JOIN /*MJ1*/ customer_address ca ON (ws1.ws_ship_addr_sk = ca.ca_address_sk)
JOIN /*MJ2*/ web_site s ON (ws1.ws_web_site_sk = s.web_site_sk)
JOIN /*MJ3*/ date_dim d ON (ws1.ws_ship_date_sk = d.d_date_sk)
LEFT SEMI JOIN /*JOIN4*/ (SELECT ws2.ws_order_number as ws_order_number
               	          FROM web_sales ws2 JOIN /*JOIN1*/ web_sales ws3
               	          ON (ws2.ws_order_number = ws3.ws_order_number)
               	          WHERE ws2.ws_warehouse_sk <> ws3.ws_warehouse_sk) ws_wh1
ON (ws1.ws_order_number = ws_wh1.ws_order_number)
LEFT SEMI JOIN /*JOIN4*/ (SELECT wr_order_number
               	          FROM web_returns wr
               	          JOIN /*JOIN3*/ (SELECT ws4.ws_order_number as ws_order_number
                                          FROM web_sales ws4 JOIN /*JOIN2*/ web_sales ws5
                                          ON (ws4.ws_order_number = ws5.ws_order_number)
                                          WHERE ws4.ws_warehouse_sk <> ws5.ws_warehouse_sk) ws_wh2
                          ON (wr.wr_order_number = ws_wh2.ws_order_number)) tmp1
ON (ws1.ws_order_number = tmp1.wr_order_number)
WHERE d.d_date >= '2001-05-01' and
      d.d_date <= '2001-06-30' and
      ca.ca_state = 'NC' and
      s.web_company_name = 'pri';

The original operator tree generated by Hive is shown below.


Anchor
figure5
figure5

Figure 5: The original operator tree of Example 3 generated by Hive

In this complex query, we will first have several MapJoins (MJ1, MJ2, and MJ3) which can be evaluated in the same Map phase. Since JOIN1, JOIN2, JOIN3, and JOIN4 use the same column as the join key, we can use a single MapReduce job to evaluate all operators before AGG1. The second MapReduce job will generate the final results. The optimized operator tree is shown below.


Anchor
figure6
figure6

Figure 6: The optimized operator tree of Example 3

3. Intra-query Correlations

4. Correlation Detection

5. Operator Tree Transformation

6. Executing Optimized Operator Tree in the Reduce Phase

Currently, blocking operators in the reduce phase operator tree share the same keys. Other cases will be supported in future work.

6.1 Executing Operator Tree with Same Keys

7. Related Jiras

The umbrella jira is HIVE-3667.

7.1 Resolved Jiras

7.2 Unresolved Jiras

8. References

  1. Rubao Lee, Tian Luo, Yin Huai, Fusheng Wang, Yongqiang He, Xiaodong Zhang. YSmart: Yet another SQL-to-MapReduce Translator, ICDCS, 2011