Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Hive HBase Bulk Load

...

Table of Contents

This page explains how to use Hive to bulk load data into a new (empty) HBase table per HIVE-1295. (If you're not using a build which contains this functionality yet, you'll need to build from source and make sure this patch and HIVE-1321 are both applied.)

...

Ideally, bulk load from Hive into HBase would be part of Hive- HBaseIntegration, making it as simple as this:

Code Block

CREATE TABLE new_hbase_table(rowkey string, x int, y int) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:x,cf:y");

SET hive.hbase.bulk=true;

INSERT OVERWRITE TABLE new_hbase_table
SELECT rowkey_expression, x, y FROM ...any_hive_query...;

...

The procedure is based on underlying HBase recommendations, and involves the following steps:

...

TBD: provide some example numbers based on Facebook experiments; also reference Hadoop Terasort

Add necessary JARs

You will need to add a couple jar files to your path. First, put them in DFS:

Code Block
hadoop dfs -put /usr/lib/hive/lib/hbase-VERSION.jar /user/hive/hbase-VERSION.jar
hadoop dfs -put /usr/lib/hive/lib/hive-hbase-handler-VERSION.jar /user/hive/hive-hbase-handler-VERSION.jar

Then add them to your hive-site.xml:

Code Block
<property>
  <name>hive.aux.jars.path</name>
  <value>/user/hive/hbase-VERSION.jar,/user/hive/hive-hbase-handler-VERSION.jar</value>
</property>

Prepare Range Partitioning

In order to perform a parallel sort on the data, we need to range-partition it. The idea is to divide the space of row keys up into nearly equal-sized ranges, one per reducer which will be used in the parallel sort. The details will vary according to your source data, and you may need to run a number of exploratory Hive queries in order to come up with a good enough set of ranges. Here's one example:

Code Block

add jar lib/hive-contrib-0.7.0.jar;
set mapred.reduce.tasks=1;
create temporary function row_sequence as 
'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';
select transaction_id from
(select transaction_id
from transactions
tablesample(bucket 1 out of 10000 on transaction_id) s 
order by transaction_id 
limit 10000000) x
where (row_sequence() % 910000)=0
order by transaction_id
limit 11;

...

Once you have your sampling query defined, the next step is to save its results to a properly formatted file which will be used in a subsequent step. To do this, run commands like the following:

Code Block

create external table hb_range_keys(transaction_id_range_start string)
row format serde 
'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
stored as 
inputformat 
'org.apache.hadoop.mapred.TextInputFormat'
outputformat 
'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
location '/tmp/hb_range_keys';

insert overwrite table hb_range_keys
select transaction_id from
(select transaction_id
from transactions
tablesample(bucket 1 out of 10000 on transaction_id) s 
order by transaction_id 
limit 10000000) x
where (row_sequence() % 910000)=0
order by transaction_id
limit 11;

...

The second command populates it (using the sampling query previously defined). Usage of ORDER BY guarantees that a single file will be produced in directory /tmp/hb_range_keys. The filename is unknown, but it is necessary to reference the file by name later, so run a command such as the following to copy it to a specific name:

Code Block

dfs -cp /tmp/hb_range_keys/* /tmp/hb_range_key_list;

...

The directory does not actually need to exist (it will be automatically created in the next step), but if it does exist, it should be empty.

Code Block

dfs -rmr /tmp/hbsort;
dfs -mkdir /tmp/hbsort;

...

Now comes the big step: running a sort over all of the data to be bulk loaded. Make sure that your Hive instance has the HBase jars available on its auxpath.

Code Block
set hive.execution.engine=mr;
set mapred.reduce.tasks=12;
set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
set total.order.partitioner.path=/tmp/hb_range_key_list;
set hfile.compression=gz;

create table hbsort(transaction_id string, user_name string, amount double, ...)
stored as
INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat'
TBLPROPERTIES ('hfile.family.path' = '/tmp/hbsort/cf');

insert overwrite table hbsort
select transaction_id, user_name, amount, ...
from transactions
cluster by transaction_id;

The CREATE TABLE creates a dummy table which controls how the output of the sort is written. Note that it uses HiveHFileOutputFormat to do this, with the table property hfile.family.path used to control the destination directory for the output. Again, be sure to set the inputformat/outputformat exactly as specified. In the example above, we select gzip (gz) compression for the result files; if you don't set the hfile.compression parameter, no compression will be performed. (The other method available is lzo, which compresses less aggressively but does not require as much CPU power.)

Note that the number of reduce tasks is one more than the number of partitions - this must be true or else you will get a "Wrong number of partitions in keyset" error.

There is a parameter hbase.hregion.max.filesize (default 256MB) which affects how HFiles are generated. If the amount of data (pre-compression) produced by a reducer exceeds this limit, more than one HFile will be generated for that reducer. This will lead to unbalanced region files. This will not cause any correctness problems, but if you want to get balanced region files, either use more reducers or set this parameter to a larger value. Note that when compression is enabled, you may see multiple files generated whose sizes are well below the limit; this is because the overflow check is done pre-compression.

...

Once the sort job completes successfully, one final step is required for importing the result files into HBase. Again, we don't know the name of the file, so we copy it over:

Code Block
dfs -copyToLocal /tmp/hbsort/cf/* /tmp/hbout

If Hive and HBase are running in different clusters, use distcp to copy the files from one to the other.

If you are using HBase 0.90.2 or newer, you can use the completebulkload utility to load the data into HBase

Code Block
hadoop jar hbase-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /tmp/hbout transactions

In older versions of HBaseOnce the files are in the HBase cluster, use the bin/loadtable.rb script which comes with HBase to import them:

Code Block

hbase org.jruby.Main loadtable.rb transactions /tmp/hbsorthbout

The first argument (transactions) specifies the name of the new HBase table. For the second argument, pass the staging directory name, not the the column family child directory.

...

Finally, if you'd like to access the HBase table you just created via Hive:

Code Block

CREATE EXTERNAL TABLE hbase_transactions(transaction_id string, user_name string, amount double, ...) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:user_name,cf:amount,...")
TBLPROPERTIES("hbase.table.name" = "transactions");

...

  • Support sparse tables
  • Support loading binary data representations once HIVE-1245 is fixed
  • Support assignment of timestamps
  • Provide control over file parameters such as compression
  • Support multiple column families once HBASE-1861 is implemented
  • Support loading into existing tables once HBASE-1923 is implemented
  • Wrap it all up into the ideal single-INSERT-with-auto-sampling job...
    as