Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Hive HBase Bulk Load

Table of Contents

This page explains how to use Hive to bulk load data into a new (empty) HBase table per HIVE-1295. (If you're not using a build which contains this functionality yet, you'll need to build from source and make sure this patch and HIVE-1321 are both applied.)

...

Ideally, bulk load from Hive into HBase would be part of HBaseIntegration, making it as simple as this:

Code Block

CREATE TABLE new_hbase_table(rowkey string, x int, y int) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:x,cf:y");

SET hive.hbase.bulk=true;

INSERT OVERWRITE TABLE new_hbase_table
SELECT rowkey_expression, x, y FROM ...any_hive_query...;

...

The procedure is based on underlying HBase recommendations, and involves the following steps:

...

TBD: provide some example numbers based on Facebook experiments; also reference Hadoop Terasort

Add necessary JARs

You will need to add a couple jar files to your path. First, put them in DFS:

Code Block
hadoop dfs -put /usr/lib/hive/lib/hbase-VERSION.jar /user/hive/hbase-VERSION.jar
hadoop dfs -put /usr/lib/hive/lib/hive-hbase-handler-VERSION.jar /user/hive/hive-hbase-handler-VERSION.jar

Then add them to your hive-site.xml:

Code Block
<property>
  <name>hive.aux.jars.path</name>
  <value>/user/hive/hbase-VERSION.jar,/user/hive/hive-hbase-handler-VERSION.jar</value>
</property>

Prepare Range Partitioning

In order to perform a parallel sort on the data, we need to range-partition it. The idea is to divide the space of row keys up into nearly equal-sized ranges, one per reducer which will be used in the parallel sort. The details will vary according to your source data, and you may need to run a number of exploratory Hive queries in order to come up with a good enough set of ranges. Here's one example:

Code Block

add jar lib/hive-contrib-0.7.0.jar;
set mapred.reduce.tasks=1;
create temporary function row_sequence as 
'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';
select transaction_id from
(select transaction_id
from transactions
tablesample(bucket 1 out of 10000 on transaction_id) s 
order by transaction_id 
limit 10000000) x
where (row_sequence() % 910000)=0
order by transaction_id
limit 11;

...

Once you have your sampling query defined, the next step is to save its results to a properly formatted file which will be used in a subsequent step. To do this, run commands like the following:

Code Block

create external table hb_range_keys(transaction_id_range_start string)
row format serde 
'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
stored as 
inputformat 
'org.apache.hadoop.mapred.TextInputFormat'
outputformat 
'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
location '/tmp/hb_range_keys';

insert overwrite table hb_range_keys
select transaction_id from
(select transaction_id
from transactions
tablesample(bucket 1 out of 10000 on transaction_id) s 
order by transaction_id 
limit 10000000) x
where (row_sequence() % 910000)=0
order by transaction_id
limit 11;

...

The second command populates it (using the sampling query previously defined). Usage of ORDER BY guarantees that a single file will be produced in directory /tmp/hb_range_keys. The filename is unknown, but it is necessary to reference the file by name later, so run a command such as the following to copy it to a specific name:

Code Block

dfs -cp /tmp/hb_range_keys/* /tmp/hb_range_key_list;

...

The directory does not actually need to exist (it will be automatically created in the next step), but if it does exist, it should be empty.

Code Block

dfs -rmr /tmp/hbsort;
dfs -mkdir /tmp/hbsort;

...

Now comes the big step: running a sort over all of the data to be bulk loaded. Make sure that your Hive instance has the HBase jars available on its auxpath.

Code Block
set hive.execution.engine=mr;
set mapred.reduce.tasks=12;
set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
set total.order.partitioner.path=/tmp/hb_range_key_list;
set hfile.compression=gz;

create table hbsort(transaction_id string, user_name string, amount double, ...)
stored as
INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/cf');

insert overwrite table hbsort
select transaction_id, user_name, amount, ...
from transactions
cluster by transaction_id;

...

Once the sort job completes successfully, one final step is required for importing the result files into HBase. Again, we don't know the name of the file, so we copy it over:

Code Block
dfs -copyToLocal /tmp/hbsort/cf/* /tmp/hbout

If Hive and HBase are running in different clusters, use distcp to copy the files from one to the other.

If you are using HBase 0.90.2 or newer, you can use the completebulkload utility to load the data into HBase

Code Block

hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /tmp/hbsorthbout transactions

In older versions of HBase, use the bin/loadtable.rb script to import them:

Code Block

hbase org.jruby.Main loadtable.rb transactions /tmp/hbsorthbout

The first argument (transactions) specifies the name of the new HBase table. For the second argument, pass the staging directory name, not the the column family child directory.

...

Finally, if you'd like to access the HBase table you just created via Hive:

Code Block

CREATE EXTERNAL TABLE hbase_transactions(transaction_id string, user_name string, amount double, ...) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:user_name,cf:amount,...")
TBLPROPERTIES("hbase.table.name" = "transactions");

...