Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Added indexed table example

...

The only additional Accumulo configuration necessary is the inclusion of the hive-accumulo-storage-handler.jar, provided as a part of the Hive distribution, to be included in the Accumulo server classpath. This can be accomplished a variety of ways: copying/symlink the jar into $ACCUMULO_HOME/lib or $ACCUMULO_HOME/lib/ext or include the path to the jar in general.classpaths in accumulo-site.xml. Be sure to restart the Accumulo tabletservers if the jar is added to the classpath in a non-dynamic fashion (using $ACCUMULO_HOME/lib or general.classpaths in accumulo-site.xml).

...

To issue queries against Accumulo using Hive, four parameters must be provided by the Hive configuration:

 

Connection Parameters

accumulo.instance.name

accumulo.zookeepers
accumulo.user.name
accumulo.user.pass

 

For those familiar with Accumulo, these four configurations are the normal configuration values necessary to connect to Accumulo: the Accumulo instance name, the ZooKeeper quorum (comma-separated list of hosts), and Accumulo username and password. The easiest way to provide these values is by using the -hiveconf option to the hive command. It is expected that the Accumulo user provided either has the ability to create new tables, or that the Hive queries will only be accessing existing Accumulo tables.

...

To access Accumulo tables, a Hive table must be created using the CREATE command with the STORED BY clause. If the EXTERNAL keyword is omitted from the CREATE call, the lifecycle of the Accumulo table is tied to the lifetime of the Hive table: if the Hive table is deleted, so is the Accumulo table. This is the default case. Providing the EXTERNAL keyword will create a Hive table that references an Accumulo table but will not remove the underlying Accumulo table if the Hive table is dropped.

Each Hive row maps to a set of Accumulo keys with the same row ID. One column in the Hive row is designated as a "special" column which is used as the Accumulo row ID. All other Hive columns in the row have some mapping to Accumulo column (column family and qualifier) where the Hive column value is placed in the Accumulo value.

No Format
CREATE TABLE accumulo_table(rowrowid STRING, name STRING, age INT, weight DOUBLE, height INT)
STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler'
WITH SERDEPROPERTIES('accumulo.columns.mapping' = ':rowid,person:name,person:age,person:weight,person:height');

In the above statement, normal Hive column name and type pairs are provided as is the case with normal create table statements. The full AccumuloStorageHandler class name is provided to inform Hive that Accumulo will back this Hive table. A number of properties can be provided to configure the AccumuloStorageHandler via SERDEPROPERTIES or TBLPROPERTIES. The most important property is "accumulo.columns.mapping" which controls how the Hive columns map to Accumulo columns. In this case, the "row" Hive column is used to populate the Accumulo row ID component of the Accumulo Key, while the other Hive columns (name, age, weight and height) are all columns within the Accumulo row.

For the above schema in the "accumulo_table", we could envision a single row in the table:

No Format
hive> select * from accumulo_table;
row1	Steve	32	200	72

The above record would be serialized into Accumulo Key-Value pairs in the following manner given the declared accumulo.columns.mapping:

No Format
user@accumulo accumulo_table> scan
row1	person:age []	32
row1	person:height []	72
row1	person:name []	Steve
row1	person:weight []	200

The power of the column mapping is that multiple Hive tables with differing column mappings can interact with the same Accumulo table and produce different results. When columns are excluded, the performance of Hive queries can be improved through the use of Accumulo locality groups to filter out unwanted data at the server-side.

Column Mapping

The column mapping string is comma-separated list of encoded values whose offset corresponds to the Hive schema for the table. The order of the columns in the Hive schema can be arbitrary as long as the elements in the column mapping align to the intended Hive column. For those familiar with Accumulo, each element in the column mapping string resembles a column_family:column_qualifier; however, there are a few different variants that allow for different control.

...

These are set by including a pound sign ('#') after the column mapping element with either the long or short serialization value. The default serialization is 'string'. For example, for the value 10, "person:age#s" is synonymous with the "person:age" and would serialize the value as the literal string "10". If "person:age#b" was used instead, the value would be serialized as four bytes: \x00\x00\x00\xA0.

Indexing

Starting in Hive 3.0.0 with HIVE-15795, indexing support has been added to Accumulo-backed Hive tables. Indexing works by using another Accumulo table to store the field value mapping to rowId of the data table. The index table is automatically populated on record insertion via Hive.

Using index tables greatly improve performance of non-rowId predicate queries by eliminating full table scans. Indexing works for both internally and externally managed tables using either the Tez or Map Reduce query engines. The following options control indexing behavior.

Option Name

Description

accumulo.indextable.name(Required) The name of the index table in Accumulo.

accumulo.indexed.columns

(Optional) A comma separated list of hive columns to index, or * which indexes all columns (default: *)
accumulo.index.rows.max

(Optional) The maximum number of predicate values to scan from the index for each search predicate (default: 20000)

 

See this note about this value
accumulo.index.scanner(Optional) The index scanner implementation. (default: org.apache.hadoop.hive.accumulo.AccumuloDefaultIndexScanner)

The indexes are stored in the index table using the following format:

rowId = [field value in data table]

column_family = [field column family in data table] + ‘_’ + [field column quantifier in data table]

column_quantifier = [field rowId in data table]

visibility = [field visibility in data table]

value = [empty byte array]

 

When using a string encoded table, the indexed field value is encoded using Accumulo Lexicoder methods for numeric types. Otherwise, values are encoding using native binary encoding. This information will allow applications to insert data and index values into Accumulo outside of Hive but still require high performance queries from within Hive.

It is important to note when inserting data and indexes outside of Hive it is important to update both tables within the same unit of work. If the Hive query does not find indexes matches for the any of the query predicates, the query will short circuit and return empty results without searching the data table.

Anchor
index_rows_max
index_rows_max
If the search predicate matches more entries than defined by the option accumulo.index.rows.max (default 20000), the index search results will be abandoned and the query will fall back to a full scan of the data table with predicate filtering. Remember using large values for this option or having very large data table rowId values may require increasing hive memory to prevent memory errors.

Other options

The following options are also valid to be used with SERDEPROPERTIES or TABLEPROPERTIES for further control over the actions of the AccumuloStorageHandler:

...

No Format
CREATE TABLE hive_map(key int, value map<string,int>) 
STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler'
WITH SERDEPROPERTIES (
  "accumulo.columns.mapping" = ":rowID,cf:*",
  "accumulo.default.storage" = "binary"
);

...

No Format
CREATE EXTERNAL TABLE countries(key string, name string, country string, country_id int)
STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler'
WITH SERDEPROPERTIES ("accumulo.columns.mapping" = ":rowID,info:name,info:country,info:country_id");

Create an indexed table

To take advantage of indexing, Hive uses another Accumulo table is used to create a lexicographically-sorted search term index for each field allowing for very efficient exact match and bounded range searches.

No Format
CREATE TABLE company_stats (
   rowid string,
   active_entry boolean,
   num_offices tinyint,
   num_personel smallint,
   total_manhours int,
   num_shareholders bigint,
   eff_rating float,
   err_rating double,
   yearly_production decimal,
   start_date date,
   address varchar(100),
   phone char(13),
   last_update timestamp )
ROW FORMAT SERDE 'org.apache.hadoop.hive.accumulo.serde.AccumuloSerDe'
STORED BY 'org.apache.hadoop.hive.accumulo.AccumuloStorageHandler'
WITH SERDEPROPERTIES (
   "accumulo.columns.mapping" = ":rowID,a:act,a:off,a:per,a:mhs,a:shs,a:eff,a:err,a:yp,a:sd,a:addr,a:ph,a:lu”,
   "accumulo.table.name"="company_stats",
   "accumulo.indextable.name"="company_stats_idx"
 );

Acknowledgements

I would be remiss to not mention the efforts made by Brian Femiano that were the basis for this storage handler. His initial prototype for Accumulo-Hive integration was the base for this work.