Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The initial implementation, started in HIVE-14217, focused on 1) enabling the discovery of data that is already stored in Druid from Hive, and 2) being able to query that data, trying to make use of Druid advanced querying capabilities. For instance, we put special emphasis on pushing as much computation as possible to Druid, and being able to recognize the type of queries for which Druid is specially efficient, e.g. timeseries or topN groupBy queries.

Future work after the first step is completed is being listed in HIVE-14473. If you want to collaborate on this effort, a list of remaining issues can be found at the end of this document.

...

In particular, we implemented our extension to the optimizer in HIVE-14217, which builds upon the work initiated in CALCITE-1121, and extends its logic to identify more complex query patterns (timeseries and topN queries), translate filters on the time dimension to Druid intervals, push limit into Druid select queries, etc.

Currently, we support the recognition of timeseriestopNgroupBy, and select queries.

...

We generate a single Hive split with the corresponding Druid query for timeseries, topN, timeseries and groupBy, from which we generate the records. Thus, the degree of parallelism is 1 in these cases. However, for simple select queries without limit (although they might still contain filters or projections), we partition the original query into x queries and generate one split for each of them, thus incrementing the degree of parallelism for these queries, which usually return a large number of results, to x. We explain more details later on. 

...

Code Block
{
  "queryType":"timeseries",
  "dataSource":"wikiticker",
  "descending":"false",
  "granularity":"NONE",
  "aggregations":[
    {"type":"longMax", "name":"$f1", "fieldName":"delta"},
    {"type":"longSum", "name":"$f2", "fieldName":"added"}
  ],
  "intervals":["-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00"]
}

...

GroupBy queries

TopN is the third The final type of queries we currently support . These queries return a sorted set of results for the values in a single dimension according to some criteria. For this case, topN queries are much faster and resource efficient than groupBy queries, which we introduce below. The semantics of topN queries in SQL is as follows:is groupBy. This kind of query is more expressive than timeseries queries; however, they are less performant. Thus, we only fall back to groupBy queries when we cannot transform into timeseries queries.

For instance, the following SQL query will generate a Druid groupBy query:

Code Block
sql
sql
SELECT `channel`, `floor_month`(`__time`), max(delta) as m, sum(added)
FROM druid_table_1
GROUP BY `channel`, `floor_month`(`__time`)
ORDER BY m DESC
LIMIT 10;

Basically, the query is asking for the top 10 maximum values of delta for each channel with a monthly granularity. It also asks for the sum on another column. The generated equivalent Druid JSON query is the following:

`user`;
Code Block
{
  "queryType":"topNgroupBy",
  "dataSource":"wikiticker",
  "granularity":"MONTHALL",
  "dimensiondimensions":["channel",
  "metric":"$f2"user"],
  "aggregations":[
    {"type":"longMax","name":"$f2","fieldName":"delta"},
    {"type":"longSum","name":"$f3","fieldName":"added"}],
  "intervals\":["-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00"],
  "threshold\":10
}

Observe that we need to use the metric field to specify the metric on which we would like to execute the top operation. Finally, we show the results for the query:

}

Queries across Druid and Hive

Finally, we provide an example of a query that runs across Druid and Hive. In particular, let us create a second table in Hive with some data:

Code Block
sql
sql
CREATE TABLE hive_table_1 (col1 INT, col2 STRING);
INSERT INTO hive_table_1 VALUES(1, '#en.wikipedia');

Assume we want to execute the following query:

Code Block
sql
sql
SELECT a.channel, b.col1
FROM
(
  SELECT `channel`, 
Code Block
hive> SELECT `channel`, `floor_month`(`__time`), max(delta) as m, sum(added)
    > FROM druid_table_1
    > GROUP BY `channel`, `floor_month`year`(`__time`)
    > ORDER BY m DESC
  LIMIT  > LIMIT 10;
OK
#en.wikipedia	2015-09-01 01:00:00	199818	3045299
#ru.wikipedia	2015-09-01 01:00:00	102719	640698
#es.wikipedia	2015-09-01 01:00:00	94187	634670
#fr.wikipedia	2015-09-01 01:00:00	92182	642555
#ar.wikipedia	2015-09-01 01:00:00	73433	153605
#cs.wikipedia	2015-09-01 01:00:00	57174	132768
#de.wikipedia	2015-09-01 01:00:00	52923	522625
#hu.wikipedia	2015-09-01 01:00:00	49061	166101
#nl.wikipedia	2015-09-01 01:00:00	48573	145634
#ja.wikipedia	2015-09-01 01:00:00	47871	317242
Time taken: 1.038 seconds, Fetched: 10 row(s)

GroupBy queries

The final type of queries we currently support is groupBy. This kind of query is more expressive than timeseries and topN queries; however, they are less performant. Thus, we only fall back to groupBy queries when we cannot transform into timeseries or topN queries.

For instance, the following SQL query will generate a Druid groupBy query:

...

SELECT max(delta), sum(added)
FROM druid_table_1
GROUP BY `channel`, `user`;
Code Block
{
  "queryType":"groupBy",
  "dataSource":"wikiticker",
  "granularity":"ALL",
  "dimensions":["channel","user"],
  "aggregations":[
    {"type":"longMax","name":"$f2","fieldName":"delta"},
    {"type":"longSum","name":"$f3","fieldName":"added"}],
  "intervals":["-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00"]
}

Queries across Druid and Hive

Finally, we provide an example of a query that runs across Druid and Hive. In particular, let us create a second table in Hive with some data:

...

CREATE TABLE hive_table_1 (col1 INT, col2 STRING);
INSERT INTO hive_table_1 VALUES(1, '#en.wikipedia');
1000
) a
JOIN
(
  SELECT col1, col2
  FROM hive_table_1
) b
ON a.channel = b.col2;

The query is a simple join on columns channel and col2. The subquery a is executed completely in Druid as a groupBy query. Then the results are joined in Hive with the results of results of subquery b. The query plan and execution in Tez is shown in the following:

Code Block
hive> explain
    > SELECT a.channel, b.col1
    > FROM
    > (
    >   SELECT `channel`, max(delta) as m, sum(added)
    >   FROM druid_table_1
    >   GROUP BY `channel`, `floor_year`(`__time`)
    >   ORDER BY m DESC
    >   LIMIT 1000
    > ) a
    > JOIN
    > (
    >   SELECT col1, col2
    >   FROM hive_table_1
    > ) b
    > ON a.channel = b.col2;
OK
Plan optimized by CBO.
Vertex dependency in root stage
Map 2 <- Map 1 (BROADCAST_EDGE)
Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Map 2
      File Output Operator [FS_11]
        Select Operator [SEL_10] (rows=1 width=0)
          Output:["_col0","_col1"]
          Map Join Operator [MAPJOIN_16] (rows=1 width=0)
            Conds:RS_7._col0=SEL_6._col1(Inner),HybridGraceHashJoin:true,Output:["_col0","_col2"]
          <-Map 1 [BROADCAST_EDGE]
            BROADCAST [RS_7]
              PartitionCols:_col0
              Filter Operator [FIL_2] (rows=1 width=0)
                predicate:_col0 is not null
                Select Operator [SEL_1] (rows=1 width=0)
                  Output:["_col0"]
                  TableScan [TS_0

Assume we want to execute the following query:

...

SELECT a.channel, b.col1
FROM
(
  SELECT `channel`, max(delta) as m, sum(added)
  FROM druid_table_1
  GROUP BY `channel`, `floor_year`(`__time`)
  ORDER BY m DESC
  LIMIT 1000
) a
JOIN
(
  SELECT col1, col2
  FROM hive_table_1
) b
ON a.channel = b.col2;

The query is a simple join on columns channel and col2. The subquery a is executed completely in Druid as a topN query. Then the results are joined in Hive with the results of results of subquery b. The query plan and execution in Tez is shown in the following:

Code Block
hive> explain
    > SELECT a.channel, b.col1
    > FROM
    > (
    >   SELECT `channel`, max(delta) as m, sum(added)
    >   FROM druid_table_1
    >   GROUP BY `channel`, `floor_year`(`__time`)
    >   ORDER BY m DESC
    >   LIMIT 1000
    > ) a
    > JOIN
    > (
    >   SELECT col1, col2
    >   FROM hive_table_1
    > ) b
    > ON a.channel = b.col2;
OK
Plan optimized by CBO.
Vertex dependency in root stage
Map 2 <- Map 1 (BROADCAST_EDGE)
Stage-0
  Fetch Operator
    limit:-1
    Stage-1
      Map 2
      File Output Operator [FS_11]
        Select Operator [SEL_10] (rows=1 width=0)
          Output:["_col0","_col1"]
          Map Join Operator [MAPJOIN_16] (rows=1 width=0)
            Conds:RS_7._col0=SEL_6._col1(Inner),HybridGraceHashJoin:true,Output:["_col0","_col2"]
          <-Map 1 [BROADCAST_EDGE]
            BROADCAST [RS_7]
              PartitionCols:_col0
              Filter Operator [FIL_2] (rows=1 width=0)
                predicate:_col0 is not null
                Select Operator [SEL_1] (rows=1 width=0)
                  Output:["_col0"]
                  TableScan [TS_0] (rows=1 width=0)
                    druid@druid_table_1,druid_table_1,Tbl:PARTIAL,Col:NONE,Output:["channel"],properties:{"druid.query.json":"{\"queryType\":\"topN\",\"dataSource\":\"wikiticker\",\"granularity\":\"YEAR\",\"dimension\":\"channel\",\"metricdruid@druid_table_1,druid_table_1,Tbl:PARTIAL,Col:NONE,Output:["channel"],properties:{"druid.query.json":"{\"queryType\":\"groupBy\",\"dataSource\":\"wikiticker\",\"granularity\":\"all\",\"dimensions\":[{\"type\":\"default\",\"dimension\":\"channel\"},{\"type\":\"extraction\",\"dimension\":\"__time\",\"outputName\":\"floor_year\",\"extractionFn\":{\"type\":\"timeFormat\",\"format\":\"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'\",\"granularity\":\"year\",\"timeZone\":\"UTC\",\"locale\":\"en-US\"}}],\"limitSpec\":{\"type\":\"default\",\"limit\":1000,\"columns\":[{\"dimension\":\"$f2\",\"direction\":\"descending\",\"dimensionOrder\":\"numeric\"}]},\"aggregations\":[{\"type\":\"doubleMax\",\"name\":\"$f2\",\"aggregationsfieldName\":[\"delta\"},{\"type\":\"longMaxdoubleSum\",\"name\":\"$f2$f3\",\"fieldName\":\"deltaadded\"}],\"intervals\":[\"1900-14613654301-09-08T0801T00:2200:1700.096000/3000-00:01:15/146140482-04-24T16:36:27.903+01:00\"],\"threshold\":1000-01T00:00:00.000\"]}","druid.query.type":"topNgroupBy"}
          <-Select Operator [SEL_6] (rows=1 width=15)
              Output:["_col0","_col1"]
              Filter Operator [FIL_15] (rows=1 width=15)
                predicate:col2 is not null
                TableScan [TS_4] (rows=1 width=15)
                  druid@hive_table_1,hive_table_1,Tbl:COMPLETE,Col:NONE,Output:["col1","col2"]
Time taken: 0.924 seconds, Fetched: 31 row(s)
hive> SELECT a.channel, b.col1
    > FROM
    > (
    >   SELECT `channel`, max(delta) as m, sum(added)
    >   FROM druid_table_1
    >   GROUP BY `channel`, `floor_year`(`__time`)
    >   ORDER BY m DESC
    >   LIMIT 1000
    > ) a
    > JOIN
    > (
    >   SELECT col1, col2
    >   FROM hive_table_1
    > ) b
    > ON a.channel = b.col2;
Query ID = user1_20160818202329_e9a8b3e8-18d3-49c7-bfe0-99d38d2402d3
Total jobs = 1
Launching Job 1 out of 1
2016-08-18 20:23:30 Running Dag: dag_1471548210492_0001_1
2016-08-18 20:23:30 Starting to run new task attempt: attempt_1471548210492_0001_1_00_000000_0
Status: Running (Executing on YARN cluster with App id application_1471548210492_0001)
----------------------------------------------------------------------------------------------
        VERTICES      MODE        STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
----------------------------------------------------------------------------------------------
Map 1 .......... container     SUCCEEDED      1          1        0        0       0       0
Map 2 .......... container     SUCCEEDED      1          1        0        0       0       0
----------------------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 0.15 s
----------------------------------------------------------------------------------------------
2016-08-18 20:23:31 Completed running task attempt: attempt_1471548210492_0001_1_00_000000_0
OK
#en.wikipedia	1
Time taken: 1.835 seconds, Fetched: 2 row(s)

...