Wiki Markup |
---|
This page documents Correlation Optimizer. It was originally introduced by [HIVE-2206|https://issues.apache.org/jira/browse/HIVE-2206] and based on the idea of YSmart \[1\]. |
1
...
Overview
In Hadoop environments, an SQL query submitted to Hive will be evaluated in distributed systems. Thus, after generating a query operator tree representing the submitted SQL query, Hive needs to determine what operations can be executed in a task which will be evalauted in a single node. Also, since a MapReduce job can shuffle data data once, Hive also needs to cut the tree to multiple MapReduce jobs. It is important to cut an operator tree to multiple MapReduce in a good way, so the generated plan can evaluate the query efficiently.
...
Before we integrating Correlation Optimizer into Hive, Hive has ReduceSink Deduplication Optimizer which can figure out if we need to shuffle data for chained operators. However, to support more complex operator trees, we need a more general-purpose optimizer and a mechanism to correctly execute optimized plan. Thus, we have designed and implemented Correlation Optimizer and two operators for evaluating optimized plans. It is worth noting that it is better to use ReduceSink Deduplication Optimizer to handle simple cases first and then use Correlation Optimizer to handle more complex cases.
2
...
Examples
At first, let's take a look at three examples. For every query, we show the original operator tree generated by Hive and the optimized operator tree. To be concise, we only show the following operators, which are FileSinkOperator (FS)
, GroupByOperator (AGG)
, HashTableSinkOperator (HT)
, JoinOperator (JOIN)
, MapJoinOperator (MJ)
, and ReduceSinkOperator (RS)
. Also, in every query, we add comments (e.g. /*JOIN1*/) to indicate the node in the operator tree that an operation belongs to.
2.1 Example 1
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT tmp1.key, count(*) FROM (SELECT key, avg(value) AS avg FROM t1 GROUP BY /*AGG1*/ key) tmp1 JOIN /*JOIN1*/ t1 ON (tmp1.key = t2.key) WHERE t1.value > tmp1.avg GROUP BY /*AGG2*/ tmp1.key; |
...
Since the input table of AGG1
and the left table of JOIN1
are both t1
, when we use a single MapReduce job to evaluate this query, Hive only needs to scan t1
once. While, in the original plan, t1
is used in two MapReduce jobs, and thus it is scanned twice.
2.2 Example 2
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT tmp1.key, count(*) FROM t1 JOIN /*JOIN1*/ (SELECT key, avg(value) AS avg FROM t1 GROUP BY /*AGG1*/ key) tmp1 ON (t1.key = tmp1.key) JOIN /*JOIN1*/ t2 ON (tmp1.key = t2.key) WHERE t2.value > tmp1.avg GROUP BY /*AGG2*/ t1.key; |
...
Anchor | ||||
---|---|---|---|---|
|
Figure 4: The optimized operator tree of Example 2
2.3 Example 3
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT count(distinct ws1.ws_order_number) as order_count, sum(ws1.ws_ext_ship_cost) as total_shipping_cost, sum(ws1.ws_net_profit) as total_net_profit FROM web_sales ws1 JOIN /*MJ1*/ customer_address ca ON (ws1.ws_ship_addr_sk = ca.ca_address_sk) JOIN /*MJ2*/ web_site s ON (ws1.ws_web_site_sk = s.web_site_sk) JOIN /*MJ3*/ date_dim d ON (ws1.ws_ship_date_sk = d.d_date_sk) LEFT SEMI JOIN /*JOIN4*/ (SELECT ws2.ws_order_number as ws_order_number FROM web_sales ws2 JOIN /*JOIN1*/ web_sales ws3 ON (ws2.ws_order_number = ws3.ws_order_number) WHERE ws2.ws_warehouse_sk <> ws3.ws_warehouse_sk) ws_wh1 ON (ws1.ws_order_number = ws_wh1.ws_order_number) LEFT SEMI JOIN /*JOIN4*/ (SELECT wr_order_number FROM web_returns wr JOIN /*JOIN3*/ (SELECT ws4.ws_order_number as ws_order_number FROM web_sales ws4 JOIN /*JOIN2*/ web_sales ws5 ON (ws4.ws_order_number = ws5.ws_order_number) WHERE ws4.ws_warehouse_sk <> ws5.ws_warehouse_sk) ws_wh2 ON (wr.wr_order_number = ws_wh2.ws_order_number)) tmp1 ON (ws1.ws_order_number = tmp1.wr_order_number) WHERE d.d_date >= '2001-05-01' and d.d_date <= '2001-06-30' and ca.ca_state = 'NC' and s.web_company_name = 'pri'; |
...
Anchor | ||||
---|---|---|---|---|
|
Figure 6: The optimized operator tree of Example 3
3
...
Intra-query Correlations
In Hive, a submitted SQL query needs to be evaluated in a distributed system. When evaluating a query, data may need to shuffled sometimes. Based on the nature of different data operations, operators in Hive can be divided to two categories.
- Operators which do not require data shuffling. Examples are
TableScanOperator
,SelectOperator
andFilterOperator
. - Operators which require data shuffling. Examples are
GroupByOperator
andJoinOperator
.
...
For an operator requiring data shuffling, Hive will add one or multiple ReduceSinkOperators
as parents of this operator (the number of ReduceSinkOperators
depends on the number of inputs of the operator requiring data shuffling). Those ReduceSinkOperators
form the boundary between the Map phase and Reduce phase. Then, Hive will cut the operator tree to multiple pieces (MapReduce tasks) and each piece can be executed in a MapReduce job.
For a complex query, it is possible that a input table is used by multiple MapReduce tasks. In this case, this table will be loaded multiple times when the original operator tree is used. Also, when generating those ReduceSinkOperators
, Hive does not consider if the corresponding operator requiring data shuffling really needs a re-partitioned input data. For example, in the original operator tree of Example 1 (Figure 1), AGG1
, JOIN1
, and AGG2
require the data been shuffled in the same way because all of them require the column key
to be the partitioning column in their corresponding ReduceSinkOperators
. But, Hive is not aware this correlation between AGG1
, JOIN1
, and AGG2
, and still generates three MapReduce tasks.
Correlation Optimizer aims to exploit two intra-qeury correlations mentioned above.
- Input Correlation: A input table is used by multiple MapReduce tasks in the original operator tree.
- Job Flow Correlation: Two dependent MapReduce tasks shuffle the data in the same way.
4 Correlation Detection
5
...
Operator Tree Transformation
6
...
Executing Optimized Operator Tree in the Reduce Phase
Currently, blocking operators in the reduce phase operator tree share the same keys. Other cases will be supported in future work.
6.1 Executing Operator Tree with Same Keys
7
...
Related Jiras
The umbrella jira is HIVE-3667.
...
8
...
References
- Rubao Lee, Tian Luo, Yin Huai, Fusheng Wang, Yongqiang He, Xiaodong Zhang. YSmart: Yet another SQL-to-MapReduce Translator, ICDCS, 2011