...
The initial implementation, started in HIVE-14217, focused on 1) enabling the discovery of data that is already stored in Druid from Hive, and 2) being able to query that data, trying to make use of Druid advanced querying capabilities. For instance, we put special emphasis on pushing as much computation as possible to Druid, and being able to recognize the type of queries for which Druid is specially efficient, e.g. timeseries or topN groupBy queries.
Future work after the first step is completed is being listed in HIVE-14473. If you want to collaborate on this effort, a list of remaining issues can be found at the end of this document.
...
In particular, we implemented our extension to the optimizer in HIVE-14217, which builds upon the work initiated in CALCITE-1121, and extends its logic to identify more complex query patterns (timeseries and topN queries), translate filters on the time dimension to Druid intervals, push limit into Druid select queries, etc.
Currently, we support the recognition of timeseries, topN, groupBy, and select queries.
...
We generate a single Hive split with the corresponding Druid query for timeseries, topN, timeseries and groupBy, from which we generate the records. Thus, the degree of parallelism is 1 in these cases. However, for simple select queries without limit (although they might still contain filters or projections), we partition the original query into x queries and generate one split for each of them, thus incrementing the degree of parallelism for these queries, which usually return a large number of results, to x. We explain more details later on.
...
Code Block |
---|
{ "queryType":"timeseries", "dataSource":"wikiticker", "descending":"false", "granularity":"NONE", "aggregations":[ {"type":"longMax", "name":"$f1", "fieldName":"delta"}, {"type":"longSum", "name":"$f2", "fieldName":"added"} ], "intervals":["-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00"] } |
...
GroupBy queries
TopN is the third The final type of queries we currently support . These queries return a sorted set of results for the values in a single dimension according to some criteria. For this case, topN queries are much faster and resource efficient than groupBy queries, which we introduce below. The semantics of topN queries in SQL is as follows:is groupBy. This kind of query is more expressive than timeseries queries; however, they are less performant. Thus, we only fall back to groupBy queries when we cannot transform into timeseries queries.
For instance, the following SQL query will generate a Druid groupBy query:
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT `channel`, `floor_month`(`__time`), max(delta) as m, sum(added) FROM druid_table_1 GROUP BY `channel`, `floor_month`(`__time`) ORDER BY m DESC LIMIT 10; |
Basically, the query is asking for the top 10 maximum values of delta
for each channel
with a monthly
granularity. It also asks for the sum
on another column. The generated equivalent Druid JSON query is the following:
`user`; |
Code Block |
---|
{ "queryType":"topNgroupBy", "dataSource":"wikiticker", "granularity":"MONTHALL", "dimensiondimensions":["channel", "metric":"$f2"user"], "aggregations":[ {"type":"longMax","name":"$f2","fieldName":"delta"}, {"type":"longSum","name":"$f3","fieldName":"added"}], "intervals\":["-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00"], "threshold\":10 } |
Observe that we need to use the metric
field to specify the metric on which we would like to execute the top operation. Finally, we show the results for the query:
} |
Queries across Druid and Hive
Finally, we provide an example of a query that runs across Druid and Hive. In particular, let us create a second table in Hive with some data:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE hive_table_1 (col1 INT, col2 STRING);
INSERT INTO hive_table_1 VALUES(1, '#en.wikipedia'); |
Assume we want to execute the following query:
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT a.channel, b.col1
FROM
(
SELECT `channel`, | ||||
Code Block | ||||
hive> SELECT `channel`, `floor_month`(`__time`), max(delta) as m, sum(added) > FROM druid_table_1 > GROUP BY `channel`, `floor_month`year`(`__time`) > ORDER BY m DESC LIMIT > LIMIT 10; OK #en.wikipedia 2015-09-01 01:00:00 199818 3045299 #ru.wikipedia 2015-09-01 01:00:00 102719 640698 #es.wikipedia 2015-09-01 01:00:00 94187 634670 #fr.wikipedia 2015-09-01 01:00:00 92182 642555 #ar.wikipedia 2015-09-01 01:00:00 73433 153605 #cs.wikipedia 2015-09-01 01:00:00 57174 132768 #de.wikipedia 2015-09-01 01:00:00 52923 522625 #hu.wikipedia 2015-09-01 01:00:00 49061 166101 #nl.wikipedia 2015-09-01 01:00:00 48573 145634 #ja.wikipedia 2015-09-01 01:00:00 47871 317242 Time taken: 1.038 seconds, Fetched: 10 row(s) |
GroupBy queries
The final type of queries we currently support is groupBy. This kind of query is more expressive than timeseries and topN queries; however, they are less performant. Thus, we only fall back to groupBy queries when we cannot transform into timeseries or topN queries.
For instance, the following SQL query will generate a Druid groupBy query:
...
SELECT max(delta), sum(added)
FROM druid_table_1
GROUP BY `channel`, `user`;
Code Block |
---|
{
"queryType":"groupBy",
"dataSource":"wikiticker",
"granularity":"ALL",
"dimensions":["channel","user"],
"aggregations":[
{"type":"longMax","name":"$f2","fieldName":"delta"},
{"type":"longSum","name":"$f3","fieldName":"added"}],
"intervals":["-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00"]
} |
Queries across Druid and Hive
Finally, we provide an example of a query that runs across Druid and Hive. In particular, let us create a second table in Hive with some data:
...
CREATE TABLE hive_table_1 (col1 INT, col2 STRING);
INSERT INTO hive_table_1 VALUES(1, '#en.wikipedia');
1000
) a
JOIN
(
SELECT col1, col2
FROM hive_table_1
) b
ON a.channel = b.col2; |
The query is a simple join on columns channel
and col2
. The subquery a
is executed completely in Druid as a groupBy query. Then the results are joined in Hive with the results of results of subquery b
. The query plan and execution in Tez is shown in the following:
Code Block |
---|
hive> explain
> SELECT a.channel, b.col1
> FROM
> (
> SELECT `channel`, max(delta) as m, sum(added)
> FROM druid_table_1
> GROUP BY `channel`, `floor_year`(`__time`)
> ORDER BY m DESC
> LIMIT 1000
> ) a
> JOIN
> (
> SELECT col1, col2
> FROM hive_table_1
> ) b
> ON a.channel = b.col2;
OK
Plan optimized by CBO.
Vertex dependency in root stage
Map 2 <- Map 1 (BROADCAST_EDGE)
Stage-0
Fetch Operator
limit:-1
Stage-1
Map 2
File Output Operator [FS_11]
Select Operator [SEL_10] (rows=1 width=0)
Output:["_col0","_col1"]
Map Join Operator [MAPJOIN_16] (rows=1 width=0)
Conds:RS_7._col0=SEL_6._col1(Inner),HybridGraceHashJoin:true,Output:["_col0","_col2"]
<-Map 1 [BROADCAST_EDGE]
BROADCAST [RS_7]
PartitionCols:_col0
Filter Operator [FIL_2] (rows=1 width=0)
predicate:_col0 is not null
Select Operator [SEL_1] (rows=1 width=0)
Output:["_col0"]
TableScan [TS_0 |
Assume we want to execute the following query:
...
SELECT a.channel, b.col1
FROM
(
SELECT `channel`, max(delta) as m, sum(added)
FROM druid_table_1
GROUP BY `channel`, `floor_year`(`__time`)
ORDER BY m DESC
LIMIT 1000
) a
JOIN
(
SELECT col1, col2
FROM hive_table_1
) b
ON a.channel = b.col2;
The query is a simple join on columns channel
and col2
. The subquery a
is executed completely in Druid as a topN query. Then the results are joined in Hive with the results of results of subquery b
. The query plan and execution in Tez is shown in the following:
Code Block |
---|
hive> explain > SELECT a.channel, b.col1 > FROM > ( > SELECT `channel`, max(delta) as m, sum(added) > FROM druid_table_1 > GROUP BY `channel`, `floor_year`(`__time`) > ORDER BY m DESC > LIMIT 1000 > ) a > JOIN > ( > SELECT col1, col2 > FROM hive_table_1 > ) b > ON a.channel = b.col2; OK Plan optimized by CBO. Vertex dependency in root stage Map 2 <- Map 1 (BROADCAST_EDGE) Stage-0 Fetch Operator limit:-1 Stage-1 Map 2 File Output Operator [FS_11] Select Operator [SEL_10] (rows=1 width=0) Output:["_col0","_col1"] Map Join Operator [MAPJOIN_16] (rows=1 width=0) Conds:RS_7._col0=SEL_6._col1(Inner),HybridGraceHashJoin:true,Output:["_col0","_col2"] <-Map 1 [BROADCAST_EDGE] BROADCAST [RS_7] PartitionCols:_col0 Filter Operator [FIL_2] (rows=1 width=0) predicate:_col0 is not null Select Operator [SEL_1] (rows=1 width=0) Output:["_col0"] TableScan [TS_0] (rows=1 width=0) druid@druid_table_1,druid_table_1,Tbl:PARTIAL,Col:NONE,Output:["channel"],properties:{"druid.query.json":"{\"queryType\":\"topN\",\"dataSource\":\"wikiticker\",\"granularity\":\"YEAR\",\"dimension\":\"channel\",\"metricdruid@druid_table_1,druid_table_1,Tbl:PARTIAL,Col:NONE,Output:["channel"],properties:{"druid.query.json":"{\"queryType\":\"groupBy\",\"dataSource\":\"wikiticker\",\"granularity\":\"all\",\"dimensions\":[{\"type\":\"default\",\"dimension\":\"channel\"},{\"type\":\"extraction\",\"dimension\":\"__time\",\"outputName\":\"floor_year\",\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'\",\"granularity\":\"year\",\"timeZone\":\"UTC\",\"locale\":\"en-US\"}}],\"limitSpec\":{\"type\":\"default\",\"limit\":1000,\"columns\":[{\"dimension\":\"$f2\",\"direction\":\"descending\",\"dimensionOrder\":\"numeric\"}]},\"aggregations\":[{\"type\":\"doubleMax\",\"name\":\"$f2\",\"aggregationsfieldName\":[\"delta\"},{\"type\":\"longMaxdoubleSum\",\"name\":\"$f2$f3\",\"fieldName\":\"deltaadded\"}],\"intervals\":[\"1900-14613654301-09-08T0801T00:2200:1700.096000/3000-00:01:15/146140482-04-24T16:36:27.903+01:00\"],\"threshold\":1000-01T00:00:00.000\"]}","druid.query.type":"topNgroupBy"} <-Select Operator [SEL_6] (rows=1 width=15) Output:["_col0","_col1"] Filter Operator [FIL_15] (rows=1 width=15) predicate:col2 is not null TableScan [TS_4] (rows=1 width=15) druid@hive_table_1,hive_table_1,Tbl:COMPLETE,Col:NONE,Output:["col1","col2"] Time taken: 0.924 seconds, Fetched: 31 row(s) hive> SELECT a.channel, b.col1 > FROM > ( > SELECT `channel`, max(delta) as m, sum(added) > FROM druid_table_1 > GROUP BY `channel`, `floor_year`(`__time`) > ORDER BY m DESC > LIMIT 1000 > ) a > JOIN > ( > SELECT col1, col2 > FROM hive_table_1 > ) b > ON a.channel = b.col2; Query ID = user1_20160818202329_e9a8b3e8-18d3-49c7-bfe0-99d38d2402d3 Total jobs = 1 Launching Job 1 out of 1 2016-08-18 20:23:30 Running Dag: dag_1471548210492_0001_1 2016-08-18 20:23:30 Starting to run new task attempt: attempt_1471548210492_0001_1_00_000000_0 Status: Running (Executing on YARN cluster with App id application_1471548210492_0001) ---------------------------------------------------------------------------------------------- VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED ---------------------------------------------------------------------------------------------- Map 1 .......... container SUCCEEDED 1 1 0 0 0 0 Map 2 .......... container SUCCEEDED 1 1 0 0 0 0 ---------------------------------------------------------------------------------------------- VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 0.15 s ---------------------------------------------------------------------------------------------- 2016-08-18 20:23:31 Completed running task attempt: attempt_1471548210492_0001_1_00_000000_0 OK #en.wikipedia 1 Time taken: 1.835 seconds, Fetched: 2 row(s) |
...