What is a Data connector?
Data connectors (referred to as "connector" in Hive Query Language) are top level objects in Hive where users can define a set of properties required to be able to connect to an external datasource from hive. This document illustrates example of the data connector framework can be used to do SQL query federation between two distinct "hive" clusters/installations or between Hive and another hive-like compute engines (eg: EMR).
HIVEJDBC type Data connector
Apache Hive now has a connector to plugin in multiple hive and hive like sources. HIVE-27597 adds a JDBC based connector of type "HIVEJDBC". Similar to the other data connectors, this connector needs a URL, Driver name, credentials etc to be defined as part of the connector definition. Once defined, users can use the same connector object to map multiple databases from the remote datasource to local hive metastore.
HIVEJDBC connector requires the following values
- Name → local name to be able to reference the connector with. This name is shown in "show connectors" and will be used in connector DDLs like drop/alter and also in "create remote database .." statements.
- TYPE → "HIVEJDBC" so Hive Metastore knows that Connector class to use.
- URL → JDBC URL for the remote HiveServer instance.
- DCPROPERTIES → This is a freeform list that contains other info like credentials and other optional properties. These properties will be passed onto the table definitions for the databases created using this connector.
How do I use it?
- Create a connector first.
CREATE CONNECTOR hiveserver_connector TYPE 'hivejdbc' URL 'jdbc:hive2://<maskedhost>:10000'
WITH DCPROPERTIES ("hive.sql.dbcp.username"="hive", "hive.sql.dbcp.password"="hive"); - Create a database of type REMOTE in hive using the connector from Step 1. This maps a remote database named "
default
" to a hive database named "hiveserver_remote
" in hive.
CREATE REMOTE DATABASE hiveserver_remote USING hiveserver_connector
WITH DBPROPERTIES ("connector.remoteDbName"="default");
3. Use the tables in REMOTE database much like the JDBC-storagehandler based tables in hive. One big difference
is that the metadata for these tables are never persisted in hive. Currently, create/alter/drop table DDLs
are not supported in REMOTE databases.
0: jdbc:hive2://localhost:10000> USE hiveserver_remote;
0: jdbc:hive2://localhost:10000> describe formatted test_emr_tbl;
INFO : Compiling command(queryId=ngangam_20240129181838_56b888dd-a0ba-4bde-a66a-97bba48f6435): describe formatted test_emr_tbl
INFO : Semantic Analysis Completed (retrial = false)
INFO : Created Hive schema: Schema(fieldSchemas:[FieldSchema(name:col_name, type:string, comment:from deserializer), FieldSchema(name:data_type, type:string, comment:from deserializer), FieldSchema(name:comment, type:string, comment:from deserializer)], properties:null)
INFO : Completed compiling command(queryId=ngangam_20240129181838_56b888dd-a0ba-4bde-a66a-97bba48f6435); Time taken: 5.234 seconds
INFO : Concurrency mode is disabled, not creating a lock manager
INFO : Executing command(queryId=ngangam_20240129181838_56b888dd-a0ba-4bde-a66a-97bba48f6435): describe formatted test_emr_tbl
INFO : Starting task [Stage-0:DDL] in serial mode
INFO : Completed executing command(queryId=ngangam_20240129181838_56b888dd-a0ba-4bde-a66a-97bba48f6435); Time taken: 0.815 seconds
INFO : OK
+-------------------------------+-------------------------------------------------+----------------------------------------------------+
| col_name | data_type | comment |
+-------------------------------+-------------------------------------------------+----------------------------------------------------+
| tblkey | int | from deserializer |
| descr | string | from deserializer |
| | NULL | NULL |
| # Detailed Table Information | NULL | NULL |
| Database: | emr_db | NULL |
| OwnerType: | USER | NULL |
| Owner: | null | NULL |
| CreateTime: | UNKNOWN | NULL |
| LastAccessTime: | UNKNOWN | NULL |
| Retention: | 0 | NULL |
| Location: | file:/tmp/hive/warehouse/external/test_emr_tbl | NULL |
| Table Type: | EXTERNAL_TABLE | NULL |
| Table Parameters: | NULL | NULL |
| | EXTERNAL | TRUE |
| | hive.sql.database.type | HIVE |
| | hive.sql.dbcp.password | |
| | hive.sql.dbcp.username | hive |
| | hive.sql.jdbc.driver | org.apache.hive.jdbc.HiveDriver |
| | hive.sql.jdbc.url | jdbc:hive2://<maskedIP>.compute-1.amazonaws.com:10000 |
| | hive.sql.schema | default |
| | hive.sql.table | test_emr_tbl |
| | storage_handler | org.apache.hive.storage.jdbc.JdbcStorageHandler |
| | NULL | NULL |
| # Storage Information | NULL | NULL |
| SerDe Library: | org.apache.hive.storage.jdbc.JdbcSerDe | NULL |
| InputFormat: | org.apache.hive.storage.jdbc.JdbcInputFormat | NULL |
| OutputFormat: | org.apache.hive.storage.jdbc.JdbcOutputFormat | NULL |
| Compressed: | No | NULL |
| Num Buckets: | 0 | NULL |
| Bucket Columns: | [] | NULL |
| Sort Columns: | [] | NULL |
| Storage Desc Params: | NULL | NULL |
| | serialization.format | 1 |
+-------------------------------+-------------------------------------------------+----------------------------------------------------+
33 rows selected (6.099 seconds)
4. Offload the remote table to local cluster, run CTAS (example below pulls in all the data into the local table,
but you can pull in select columns and rows by applying predicates)
0: jdbc:hive2://localhost:10000> create table default.emr_clone as select * from test_emr_tbl;
INFO : Compiling command(queryId=ngangam_20240129182608_db20e2bb-1db3-473f-9564-0d81b01228bc): create table default.emr_clone as select * from test_emr_tbl
INFO : Completed compiling command(queryId=ngangam_20240129182608_db20e2bb-1db3-473f-9564-0d81b01228bc); Time taken: 6.42 seconds
INFO : Compiling command(queryId=ngangam_20240129182608_db20e2bb-1db3-473f-9564-0d81b01228bc): create table default.emr_clone as select * from test_emr_tbl
INFO : Semantic Analysis Completed (retrial = false)
INFO : Created Hive schema: Schema(fieldSchemas:[FieldSchema(name:test_emr_tbl.tblkey, type:int, comment:null), FieldSchema(name:test_emr_tbl.descr, type:string, comment:null)], properties:null)
INFO : Completed compiling command(queryId=ngangam_20240129182608_db20e2bb-1db3-473f-9564-0d81b01228bc); Time taken: 1.781 seconds
INFO : Concurrency mode is disabled, not creating a lock manager
INFO : Executing command(queryId=ngangam_20240129182608_db20e2bb-1db3-473f-9564-0d81b01228bc): create table default.emr_clone as select * from test_emr_tbl
WARN : Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. tez, impala) or using Hive 1.X releases.
INFO : Query ID = ngangam_20240129182608_db20e2bb-1db3-473f-9564-0d81b01228bc
INFO : Total jobs = 3
INFO : Launching Job 1 out of 3
INFO : Starting task [Stage-1:MAPRED] in serial mode
INFO : Number of reduce tasks determined at compile time: 1
INFO : In order to change the average load for a reducer (in bytes):
INFO : set hive.exec.reducers.bytes.per.reducer=<number>
INFO : In order to limit the maximum number of reducers:
INFO : set hive.exec.reducers.max=<number>
INFO : In order to set a constant number of reducers:
INFO : set mapreduce.job.reduces=<number>
INFO : number of splits:1
INFO : Submitting tokens for job: job_local1608643179_0003
INFO : Executing with tokens: []
INFO : The url to track the job: http://localhost:8080/
INFO : Job running in-process (local Hadoop)
INFO : 2024-01-29 18:26:19,582 Stage-1 map = 0%, reduce = 0%
INFO : 2024-01-29 18:26:20,790 Stage-1 map = 100%, reduce = 0%
INFO : 2024-01-29 18:26:21,810 Stage-1 map = 100%, reduce = 100%
INFO : Ended Job = job_local1608643179_0003
INFO : Starting task [Stage-7:CONDITIONAL] in serial mode
INFO : Stage-4 is selected by condition resolver.
INFO : Stage-3 is filtered out by condition resolver.
INFO : Stage-5 is filtered out by condition resolver.
INFO : Starting task [Stage-4:MOVE] in serial mode
INFO : Moving data to directory file:/tmp/hive/warehouse/external/.hive-staging_hive_2024-01-29_18-26-14_861_862309277586351757-1/-ext-10001 from file:/tmp/hive/warehouse/external/.hive-staging_hive_2024-01-29_18-26-14_861_862309277586351757-1/-ext-10003
INFO : Starting task [Stage-0:MOVE] in serial mode
INFO : Moving data to directory file:/tmp/hive/warehouse/external/emr_clone from file:/tmp/hive/warehouse/external/.hive-staging_hive_2024-01-29_18-26-14_861_862309277586351757-1/-ext-10001
INFO : Starting task [Stage-8:DDL] in serial mode
INFO : Starting task [Stage-2:STATS] in serial mode
INFO : Executing stats task
INFO : Table default.emr_clone stats: [numFiles=1, numRows=2, totalSize=18, rawDataSize=16, numFilesErasureCoded=0]
INFO : MapReduce Jobs Launched:
INFO : Stage-Stage-1: HDFS Read: 0 HDFS Write: 0 SUCCESS
INFO : Total MapReduce CPU Time Spent: 0 msec
INFO : Completed executing command(queryId=ngangam_20240129182608_db20e2bb-1db3-473f-9564-0d81b01228bc); Time taken: 6.492 seconds
INFO : OK
2 rows affected (14.802 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from default.emr_clone;
INFO : Compiling command(queryId=ngangam_20240129182647_7544c9d1-c68b-4a34-b6b0-910945a1dba5): select count(*) from default.emr_clone
INFO : Semantic Analysis Completed (retrial = false)
INFO : Created Hive schema: Schema(fieldSchemas:[FieldSchema(name:_c0, type:bigint, comment:null)], properties:null)
INFO : Completed compiling command(queryId=ngangam_20240129182647_7544c9d1-c68b-4a34-b6b0-910945a1dba5); Time taken: 6.282 seconds
INFO : Concurrency mode is disabled, not creating a lock manager
INFO : Executing command(queryId=ngangam_20240129182647_7544c9d1-c68b-4a34-b6b0-910945a1dba5): select count(*) from default.emr_clone
WARN : Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. tez, impala) or using Hive 1.X releases.
INFO : Query ID = ngangam_20240129182647_7544c9d1-c68b-4a34-b6b0-910945a1dba5
INFO : Total jobs = 1
INFO : Launching Job 1 out of 1
INFO : Starting task [Stage-1:MAPRED] in serial mode
INFO : Number of reduce tasks determined at compile time: 1
INFO : In order to change the average load for a reducer (in bytes):
INFO : set hive.exec.reducers.bytes.per.reducer=<number>
INFO : In order to limit the maximum number of reducers:
INFO : set hive.exec.reducers.max=<number>
INFO : In order to set a constant number of reducers:
INFO : set mapreduce.job.reduces=<number>
INFO : number of splits:1
INFO : Submitting tokens for job: job_local1913531390_0004
INFO : Executing with tokens: []
INFO : The url to track the job: http://localhost:8080/
INFO : Job running in-process (local Hadoop)
INFO : 2024-01-29 18:26:55,764 Stage-1 map = 100%, reduce = 100%
INFO : Ended Job = job_local1913531390_0004
INFO : MapReduce Jobs Launched:
INFO : Stage-Stage-1: HDFS Read: 0 HDFS Write: 0 SUCCESS
INFO : Total MapReduce CPU Time Spent: 0 msec
INFO : Completed executing command(queryId=ngangam_20240129182647_7544c9d1-c68b-4a34-b6b0-910945a1dba5); Time taken: 2.344 seconds
INFO : OK
+------+
| _c0 |
+------+
| 2 |
+------+
1 row selected (8.795 seconds)
0: jdbc:hive2://localhost:10000>
5. To fetch data from the remote tables, run SELECT queries using column spec and predicates as you would
normally with any SQL tables.
0: jdbc:hive2://localhost:10000> select * from test_emr_tbl where tblkey > 1;
INFO : Compiling command(queryId=ngangam_20240129191217_79b9e874-197d-4c31-8164-1ec2397bbff7): select * from test_emr_tbl where tblkey > 1
INFO : Completed compiling command(queryId=ngangam_20240129191217_79b9e874-197d-4c31-8164-1ec2397bbff7); Time taken: 6.219 seconds
INFO : Compiling command(queryId=ngangam_20240129191217_79b9e874-197d-4c31-8164-1ec2397bbff7): select * from test_emr_tbl where tblkey > 1
INFO : Semantic Analysis Completed (retrial = false)
INFO : Created Hive schema: Schema(fieldSchemas:[FieldSchema(name:test_emr_tbl.tblkey, type:int, comment:null), FieldSchema(name:test_emr_tbl.descr, type:string, comment:null)], properties:null)
INFO : Completed compiling command(queryId=ngangam_20240129191217_79b9e874-197d-4c31-8164-1ec2397bbff7); Time taken: 1.364 seconds
INFO : Concurrency mode is disabled, not creating a lock manager
INFO : Executing command(queryId=ngangam_20240129191217_79b9e874-197d-4c31-8164-1ec2397bbff7): select * from test_emr_tbl where tblkey > 1
INFO : Completed executing command(queryId=ngangam_20240129191217_79b9e874-197d-4c31-8164-1ec2397bbff7); Time taken: 0.001 seconds
INFO : OK
+----------------------+---------------------+
| test_emr_tbl.tblkey | test_emr_tbl.descr |
+----------------------+---------------------+
| 2 | test 2 |
+----------------------+---------------------+
1 row selected (8.238 seconds)
6. Join with local hive tables, run SELECT queries joining multiple tables (local or remote) as you would
normally with any SQL tables.