Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Fix HIVE-19291

...

For instance, we might want to create an empty table backed by Druid using a CREATE TABLE statement and then append and overwrite data using INSERT and INSERT OVERWRITE Hive statements, respectively.

Code Block
sql
sql
CREATE EXTERNAL TABLE druid_table_1
(`__time` TIMESTAMP, `dimension1` STRING, `dimension2` STRING, `metric1` INT, `metric2` FLOAT)
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler';

Another possible scenario is that our data is stored in Hive tables and we want to preprocess it and create Druid datasources from Hive to accelerate our SQL query workload. We can do that by executing a Create Table As Select (CTAS) statement. For example:

Code Block
sql
sql
CREATE EXTERNAL TABLE druid_table_1
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
AS
<select `timecolumn` as `___time`, `dimension1`, `dimension2`, `metric1`, `metric2`....>;

...

Further, note that if we do not use EXTERNAL tables, we do not specify the value for the druid.datasource property. In fact, Hive automatically uses the fully qualified name of the table to create the corresponding datasource with the same name.

INSERT, INSERT OVERWRITE and DROP statements

These statements are supported by Hive managed tables (not external) backed by Druid. 

Querying Druid from Hive

Once we have created our first table stored in Druid using the DruidStorageHandler, we are ready to execute our queries against Druid.

When we express a query over a Druid table, Hive tries to rewrite the query to be executed efficiently by pushing as much computation as possible to Druid. This task is accomplished by the cost optimizer based in Apache Calcite, which identifies patterns in the plan and apply rules to rewrite the input query into a new equivalent query with (hopefully) more operations executed in Druid.

In particular, we implemented our extension to the optimizer in HIVE-14217, which builds upon the work initiated in CALCITE-1121, and extends its logic to identify more complex query patterns (timeseries queries), translate filters on the time dimension to Druid intervals, push limit into Druid select queries, etc.

Currently, we support the recognition of timeseriesgroupBy, and select queries.

Once we have completed the optimization, the (sub)plan of operators that needs to be executed by Druid is translated into a valid Druid JSON query, and passed as a property to the Hive physical TableScan operator. The Druid query will be executed within the TableScan operator, which will generate the records out of the Druid query results.

We generate a single Hive split with the corresponding Druid query for timeseries and groupBy, from which we generate the records. Thus, the degree of parallelism is 1 in these cases. However, for simple select queries without limit (although they might still contain filters or projections), we partition the original query into x queries and generate one split for each of them, thus incrementing the degree of parallelism for these queries, which usually return a large number of results, to x. We explain more details later on. 

 

Consider that depending on the query, it might not be possible to push any computation to Druid. However, our contract is that the query should always be executed. Thus, in those cases, Hive will send a select query to Druid, which basically will read all the segments from Druid, generate records, and then execute the rest of Hive operations on those records. This is also the approach that will be followed if the cost optimizer is disabled (not recommended).

Queries completely executed in Druid

We focus first on queries that can be pushed completely into Druid. In these cases, we end up with a simple plan consisting of a TableScan and a Fetch operator on top. Thus, there is no overhead related to launching containers for the execution.

Select queries

We start with the simplest type of Druid query: select queries. Basically, a select query will be equivalent to a scan operation on the data sources, although operations such as projection, filter, or limit can still be pushed into this type of query.

Consider the following query, a simple select query for 10 rows consisting of all the columns of the table:

...

SELECT * FROM druid_table_1 LIMIT 10;

Info
titleVersion Info

Version 2.2.0: CREATE TABLE syntax when data is managed via hive.


CREATE TABLE druid_table_1
(`__time` TIMESTAMP, `dimension1` STRING, `dimension2` STRING, `metric1` INT, `metric2` FLOAT)
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler';


 NOTE - Before Hive 3.0.0, we do not use EXTERNAL tables and do not specify the value for the druid.datasource property.

For versions 3.0.0+, All Druid tables are EXTERNAL (HIVE-20085).


Druid kafka ingestion from Hive

Info
titleVersion Info

Integration with Druid Kafka Indexing Service is introduced in Hive 3.0.0 (HIVE-18976).


Druid Kafka Indexing Service
supports exactly-once ingestion from Kafka topic by managing the creation and lifetime of Kafka indexing tasks. We can manage Druid Kafka Ingestion using Hive CREATE TABLE statement as shown below.

Code Block
languagesql
titleDruid Kafka Ingestion
CREATE EXTERNAL TABLE druid_kafka_table_1(`__time` timestamp,`dimension1` string, `dimension1` string, `metric1` int, `metric2 double ....)
        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
        TBLPROPERTIES (
        "kafka.bootstrap.servers" = "localhost:9092",
        "kafka.topic" = "topic1",
        "druid.kafka.ingestion.useEarliestOffset" = "true",
        "druid.kafka.ingestion.maxRowsInMemory" = "5",
        "druid.kafka.ingestion.startDelay" = "PT1S",
        "druid.kafka.ingestion.period" = "PT1S",
        "druid.kafka.ingestion.consumer.retries" = "2"
        );

 

Observe that we specified kafka topic name and kafka bootstrap servers as part of the table properties. Other tunings for Druid Kafka Indexing Service can also be specified by prefixing them with 'druid.kafka.ingestion.'  e.g. to configure duration of druid ingestion tasks we can add "druid.kafka.ingestion.taskDuration" = "PT60S" as a table property. 

Start/Stop/Reset Druid Kafka ingestion 

We can Start/Stop/Reset druid kafka ingestion using sql statement shown below. 

Code Block
languagesql
ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');
ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'STOP');
ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET');

Note: Reseting the ingestion will reset the kafka consumer offset maintained by druid to the next offset. The consumer offsets maintained by druid will be reset to either the earliest or latest offset depending on druid.kafka.ingestion.useEarliestOffset

 table property. This can cause duplicate/missing events. We typically only need to reset kafka ingestion when messages in Kafka at the current consumer offsets are no longer available for consumption and therefore won't be ingested into Druid.

INSERT, INSERT OVERWRITE and DROP statements

Info
titleVersion Info

Version 2.2.0 : These statements are supported by Hive managed tables (not external) backed by Druid.

For versions 3.0.0+, All Druid tables are EXTERNAL (HIVE-20085) and these statements are supported for any table.

Querying Druid from Hive

Once we have created our first table stored in Druid using the DruidStorageHandler, we are ready to execute our queries against Druid.

When we express a query over a Druid table, Hive tries to rewrite the query to be executed efficiently by pushing as much computation as possible to Druid. This task is accomplished by the cost optimizer based in Apache Calcite, which identifies patterns in the plan and apply rules to rewrite the input query into a new equivalent query with (hopefully) more operations executed in Druid.

In particular, we implemented our extension to the optimizer in HIVE-14217, which builds upon the work initiated in CALCITE-1121, and extends its logic to identify more complex query patterns (timeseries queries), translate filters on the time dimension to Druid intervals, push limit into Druid select queries, etc.

Currently, we support the recognition of timeseriesgroupBy, and select queries.

Once we have completed the optimization, the (sub)plan of operators that needs to be executed by Druid is translated into a valid Druid JSON query, and passed as a property to the Hive physical TableScan operator. The Druid query will be executed within the TableScan operator, which will generate the records out of the Druid query results.

We generate a single Hive split with the corresponding Druid query for timeseries and groupBy, from which we generate the records. Thus, the degree of parallelism is 1 in these cases. However, for simple select queries without limit (although they might still contain filters or projections), we partition the original query into x queries and generate one split for each of them, thus incrementing the degree of parallelism for these queries, which usually return a large number of results, to x.

 

Consider that depending on the query, it might not be possible to push any computation to Druid. However, our contract is that the query should always be executed. Thus, in those cases, Hive will send a select query to Druid, which basically will read all the segments from Druid, generate records, and then execute the rest of Hive operations on those records. This is also the approach that will be followed if the cost optimizer is disabled (not recommended).

Queries completely executed in Druid

We focus first on queries that can be pushed completely into Druid. In these cases, we end up with a simple plan consisting of a TableScan and a Fetch operator on top. Thus, there is no overhead related to launching containers for the execution.

Select queries

We start with the simplest type of Druid query: select queries. Basically, a select query will be equivalent to a scan operation on the data sources, although operations such as projection, filter, or limit can still be pushed into this type of query.

Consider the following query, a simple select query for 10 rows consisting of all the columns of the table:

Code Block
sql
sql
SELECT * FROM druid_table_1 LIMIT 10;

The Hive plan for the query will be the following:

Code Block
hive> EXPLAIN
    > SELECT * FROM druid_table_1 LIMIT 10;
OK
Plan optimized by CBO.
Stage-0
  Fetch Operator
    limit:-1
    Select Operator [SEL_1]
      Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17","_col18","_col19","_col20","_col21"]
      TableScan [TS_0]
        Output:["__time","added","channel","cityname","comment","count","countryisocode","countryname","deleted","delta","isanonymous","isminor","isnew","isrobot","isunpatrolled","metrocode","namespace","page","regionisocode","regionname","user","user_unique"],properties:{"druid.query.json":"{\"queryType\":\"select\",\"dataSource\":\"wikiticker\",\"descending\":\"false\",\"intervals\":[\"-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00\"],\"dimensions\":[\"

The Hive plan for the query will be the following:

Code Block
hive> EXPLAIN
    > SELECT * FROM druid_table_1 LIMIT 10;
OK
Plan optimized by CBO.
Stage-0
  Fetch Operator
    limit:-1
    Select Operator [SEL_1]
      Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17","_col18","_col19","_col20","_col21"]
      TableScan [TS_0]
        Output:["__time","added","channel","cityname","comment","count","countryisocode","countryname","deleted","delta","isanonymous","isminor","isnew","isrobot","isunpatrolled","metrocode","namespace","page","regionisocode","regionname","user","user_unique"],properties:{"druid.query.json":"{\"queryType\":\"select\",\"dataSource\":\"wikiticker\",\"descending\":\"false\",\"intervals\":[\"-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00\"],\"dimensions\":[\"channel\",\"cityname\",\"comment\",\"countryisocode\",\"countryname\",\"isanonymous\",\"isminor\",\"isnew\",\"isrobot\",\"isunpatrolled\",\"metrocode\",\"namespace\",\"page\",\"regionisocode\",\"regionname\",\"user\",\"user_unique\"],\"metrics\":[\"added\",\"count\",\"deleted\",\"delta\"],\"pagingSpec\":{\"threshold\":10},\"context\":{\"druid.query.fetch\":true}}","druid.query.type":"select"}
Time taken: 0.141 seconds, Fetched: 10 row(s)

...

Code Block
sql
sql
SELECT `__time`
FROM druid_table_1
WHERE (`__time` BETWEEN '2010-01-01 00:00:00' AND '2011-01-01 00:00:00')
    OR (`__time` BETWEEN '2012-01-01 00:00:00' AND '2013__time` BETWEEN '2010-01-01 00:00:00')
LIMIT 10;

Furthermore we can infer overlapping intervals too. Finally, the filters that are not specified on the time dimension will be translated into valid Druid filters and included within the query using the filter property.

Partitioning select queries

We can partition Druid select queries that return large results into multiple subqueries that are executed in parallel against Druid. The parallelization depends on the value for the hive.druid.select.threshold configuration parameter.

 AND '2011-01-01 00:00:00')
    OR (`__time` BETWEEN '2012-01-01 00:00:00' AND '2013-01-01 00:00:00')
LIMIT 10;

Furthermore we can infer overlapping intervals too. Finally, the filters that are not specified on the time dimension will be translated into valid Druid filters and included within the query using the filter propertyIn particular, we take the number of rows of the result obtained using a segment metadata query. The number of splits for the select query is : number of rows /  hive.druid.select.threshold splits. We split the query along the time dimension, assuming that the records distribution across time is uniform (we plan to extend this logic in the future). Thus, we consider the time boundaries in the query in order to know how to split the query; if the query is not time bounded, we submit a time boundary query to Druid to obtain them.

Timeseries queries

Timeseries is one of the types of queries that Druid can execute very efficiently. The following SQL query translates directly into a Druid timeseries query:

...

Basically, we group by a given time granularity and calculate the aggregation results for each resulting group. In particular, the floor_month function over the timestamp dimension __time represents the Druid month granularity format. Currently, we support floor_year, floor_quarter, floor_month, floor_week, floor_day, floor_hour, floor_minute, and floor_second granularities. In addition, we support two special types of granularities, all and none, which we describe below. We plan to extend our integration work to support other important Druid custom granularity constructs, such as duration and period granularities.

The Hive plan for the query will be the following:Hive plan for the query will be the following:

Code Block
hive> EXPLAIN
Code Block
hive> EXPLAIN
    > SELECT `floor_month`(`__time`), max(delta), sum(added)
    > FROM druid_table_1
    > GROUP BY `floor_month`(`__time`);
OK
Plan optimized by CBO.
Stage-0
  Fetch Operator
    limit:-1
>    Select Operator [SEL_1]
      Output:["_col0","_col1","_col2"]
      TableScan [TS_0]SELECT `floor_month`(`__time`), max(delta), sum(added)
    >    Output:["__time","$f1","$f2"],FROM druid_table_1
    > GROUP BY  properties:{"druid.query.json":"{\"queryType\":\"timeseries\",\"dataSource\":\"wikiticker\",\"descending\":\"false\",\"granularity\":\"MONTH\",\"aggregations\":[{\"type\":\"longMax\",\"name\":\"$f1\",\"fieldName\":\"delta\"},{\"type\":\"longSum\",\"name\":\"$f2\",\"fieldName\":\"added\"}],\"intervals\":[\"-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00\"]}","druid.query.type":"timeseries"}
Time taken: 0.116 seconds, Fetched: 10 row(s)

Observe that the Druid query is in the properties attached to the TableScan. For readability, we format it properly:

Code Block
{
  "queryType":"timeseries",
  "dataSource":"wikiticker",
  "descending":"false",
  "granularity":"MONTH",
  "aggregations":[
    {"type":"longMax", "name":"$f1", "fieldName":"delta"},
    {"type":"longSum", "name":"$f2", "fieldName":"added"}
  ],
  "intervals":[`floor_month`(`__time`);
OK
Plan optimized by CBO.
Stage-0
  Fetch Operator
    limit:-1
    Select Operator [SEL_1]
      Output:["_col0","_col1","_col2"]
      TableScan [TS_0]
        Output:["__time","$f1","$f2"],
        properties:{"druid.query.json":"{\"queryType\":\"timeseries\",\"dataSource\":\"wikiticker\",\"descending\":\"false\",\"granularity\":\"MONTH\",\"aggregations\":[{\"type\":\"longMax\",\"name\":\"$f1\",\"fieldName\":\"delta\"},{\"type\":\"longSum\",\"name\":\"$f2\",\"fieldName\":\"added\"}],\"intervals\":[\"-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00\"]
}

Observe that the granularity for the Druid query is MONTH.

 

Two rather special cases are all and none granularities, which we introduce by example below. Consider the following query:

...

-- GRANULARITY: ALL
SELECT max(delta), sum(added)
FROM druid_table_1;
}","druid.query.type":"timeseries"}
Time taken: 0.116 seconds, Fetched: 10 row(s)

Observe that the Druid query is in the properties attached to the TableScan. For readability, we format it properlyAs it will do an aggregation on the complete dataset, it translates into a timeseries query with granularity all. In particular, the equivalent Druid query attached to the TableScan operator is the following:

Code Block
{
  "queryType":"timeseries",
  "dataSource":"wikiticker",
  "descending":"false",
  "granularity":"ALLMONTH",
  "aggregations":[
    {"type":"longMax", "name":"$f1", "fieldName":"delta"},
    {"type":"longSum", "name":"$f2", "fieldName":"added"}
  ],
  "intervals":["-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00"]
}

 

:27.903+01:00"]
}

Observe that the granularity for the Druid query is MONTH.

 

One rather special case is all granularity, which we introduce by example below. Consider the following In turn, given the following query:

Code Block
sql
sql
-- GRANULARITY: NONEALL
SELECT `__time`, max(delta), sum(added)
FROM druid_table_1
GROUP BY `__time`;;

As it will do an aggregation on the complete dataset, it translates into a It translates into a timeseries query with granularity none, as it only groups events that happened exactly at the same time. The JSON query is as follows: all. In particular, the equivalent Druid query attached to the TableScan operator is the following:

Code Block
{
  "queryType":"timeseries",
  "dataSource":"wikiticker",
  "descending":"false",
  "granularity":"NONEALL",
  "aggregations":[
    {"type":"longMax", "name":"$f1", "fieldName":"delta"},
    {"type":"longSum", "name":"$f2", "fieldName":"added"}
  ],
  "intervals":["-146136543-09-08T08:22:17.096-00:01:15/146140482-04-24T16:36:27.903+01:00"]
}

...