...
Further, note that if we do not use EXTERNAL tables, we do not specify the value for the druid.datasource
property. In fact, Hive automatically uses the fully qualified name of the table to create the corresponding datasource with the same name.
Druid kafka ingestion from Hive
Info | ||
---|---|---|
| ||
Integration with Druid Kafka Indexing Service is introduced in Hive 3.0.0 (HIVE-18976). |
Druid Kafka Indexing Service supports exactly-once ingestion from Kafka topic by managing the creation and lifetime of Kafka indexing tasks. We can manage Druid Kafka Ingestion using Hive CREATE TABLE statement as shown below.
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE EXTERNAL TABLE druid_kafka_table_1(`__time` timestamp,`dimension1` string, `dimension1` string, `metric1` int, `metric2 double ....)
STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
TBLPROPERTIES (
"kafka.bootstrap.servers" = "localhost:9092",
"kafka.topic" = "topic1",
"druid.kafka.ingestion.useEarliestOffset" = "true",
"druid.kafka.ingestion.maxRowsInMemory" = "5",
"druid.kafka.ingestion.startDelay" = "PT1S",
"druid.kafka.ingestion.period" = "PT1S",
"druid.kafka.ingestion.consumer.retries" = "2"
);
|
Observe that we specified kafka topic name and kafka bootstrap servers as part of the table properties. Other tunings for Druid Kafka Indexing Service can also be specified by prefixing them with 'druid.kafka.ingestion.' e.g. to configure duration of druid ingestion tasks we can add "druid.kafka.ingestion.taskDuration" = "PT60S" as a table property.
Start/Stop/Reset Druid Kafka ingestion
We can Start/Stop/Reset druid kafka ingestion using sql statement shown below.
Code Block | ||
---|---|---|
| ||
ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');
ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'STOP');
ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET'); |
Note: Reseting the ingestion will reset the kafka consumer offset maintained by druid to the next offset. The consumer offsets maintained by druid will be reset to either the earliest or latest offset depending on druid.kafka.ingestion.useEarliestOffset
table property. This can cause duplicate/missing events. We typically only need to reset kafka ingestion when messages in Kafka at the current consumer offsets are no longer available for consumption and therefore won't be ingested into Druid.
INSERT, INSERT OVERWRITE and DROP statements
...