Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added Druid Kafka Indexing Service Section

...

Further, note that if we do not use EXTERNAL tables, we do not specify the value for the druid.datasource property. In fact, Hive automatically uses the fully qualified name of the table to create the corresponding datasource with the same name.

Druid kafka ingestion from Hive

Info
titleVersion Info

Integration with Druid Kafka Indexing Service is introduced in Hive 3.0.0 (HIVE-18976).


Druid Kafka Indexing Service
supports exactly-once ingestion from Kafka topic by managing the creation and lifetime of Kafka indexing tasks. We can manage Druid Kafka Ingestion using Hive CREATE TABLE statement as shown below.

Code Block
languagesql
titleDruid Kafka Ingestion
CREATE EXTERNAL TABLE druid_kafka_table_1(`__time` timestamp,`dimension1` string, `dimension1` string, `metric1` int, `metric2 double ....)
        STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
        TBLPROPERTIES (
        "kafka.bootstrap.servers" = "localhost:9092",
        "kafka.topic" = "topic1",
        "druid.kafka.ingestion.useEarliestOffset" = "true",
        "druid.kafka.ingestion.maxRowsInMemory" = "5",
        "druid.kafka.ingestion.startDelay" = "PT1S",
        "druid.kafka.ingestion.period" = "PT1S",
        "druid.kafka.ingestion.consumer.retries" = "2"
        );

 

Observe that we specified kafka topic name and kafka bootstrap servers as part of the table properties. Other tunings for Druid Kafka Indexing Service can also be specified by prefixing them with 'druid.kafka.ingestion.'  e.g. to configure duration of druid ingestion tasks we can add "druid.kafka.ingestion.taskDuration" = "PT60S" as a table property. 

Start/Stop/Reset Druid Kafka ingestion 

We can Start/Stop/Reset druid kafka ingestion using sql statement shown below. 

Code Block
languagesql
ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');
ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'STOP');
ALTER TABLE druid_kafka_test SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET');

Note: Reseting the ingestion will reset the kafka consumer offset maintained by druid to the next offset. The consumer offsets maintained by druid will be reset to either the earliest or latest offset depending on druid.kafka.ingestion.useEarliestOffset

 table property. This can cause duplicate/missing events. We typically only need to reset kafka ingestion when messages in Kafka at the current consumer offsets are no longer available for consumption and therefore won't be ingested into Druid.

INSERT, INSERT OVERWRITE and DROP statements

...