You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

Hive HCatalog Streaming API 

Traditionally adding new data into Hive requires gathering a large amount of data onto HDFS and then periodically adding a new partition. This is essentially a “batch insertion”. Insertion of new data into an existing partition is not permitted. Hive Streaming API allows data to be pumped continuously into Hive. The incoming data can be continuously committed in small batches of records into an existing Hive partition or table. Once data is committed it becomes immediately visible to all Hive queries initiated subsequently.

This API is intended for streaming clients such as Flume and Storm, which continuously generate data. Streaming support is built on top of ACID based insert/update support in Hive (see Hive Transactions).

The Classes and interfaces part of the Hive streaming API are broadly categorized into two sets. The first set provides support for connection and transaction management while the second set provides I/O support. Transactions are managed by the metastore. Writes are performed directly to HDFS.

Streaming to unpartitioned tables is also supported. The API supports Kerberos authentication starting in Hive 0.14.

Note on packaging: The APIs are defined in the Java package org.apache.hive.hcatalog.streaming and part of the hive-hcatalog-streaming Maven module in Hive.

Streaming Requirements 

A few things are required to use streaming.

  1. The following settings are required in hive-site.xml to enable ACID support for streaming:
    1. hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
    2. hive.compactor.initiator.on = true
    3. hive.compactor.worker.threads > 0 
  2. “stored as orc” must be specified during table creation. Only ORC storage format is supported currently.
  3. The Hive table must be bucketed, but not sorted. So something like “clustered by (colName) into 10 buckets” must be specified during table creation. The number of buckets is ideally the same as the number of streaming writers.
  4. User of the client streaming process must have the necessary permissions to write to the table or partition and create partitions in the table.
  5. When issuing MapReduce queries on streaming tables, the user must set hive.input.format to org.apache.hadoop.hive.ql.io.HiveInputFormat.
  6. (Temporary requirements) When issuing queries on streaming tables, the client needs to set
    1. hive.vectorized.execution.enabled  to  false     (for hive version < 0.14.0)

    2. hive.input.format  to  org.apache.hadoop.hive.ql.io.HiveInputFormat

Limitations

Out of the box, currently, the streaming API only provides support for streaming delimited input data (such as CSV, tab separated, etc.) and JSON (strict syntax) formatted data. Support for other input formats can be provided by additional implementations of the RecordWriter interface.

Only ORC is currently supported.

API Usage

Transaction and Connection Management

HiveEndPoint

The class HiveEndPoint describes a Hive End Point to connect to. This describes the database, table and partition names. Invoking the newConnection method on it establishes a connection to the Hive MetaStore for streaming purposes. It returns a StreamingConnection object. Multiple connections can be established on the same endpoint. StreamingConnection can then be used to initiate new transactions for performing I/O.

It is very likely that in a setup where data is being streamed continuously the data is added into new partitions periodically. Either the Hive admin can pre-create the necessary partitions or the streaming clients can create them as needed. HiveEndPoint.newConnection() accepts a boolean argument to indicate whether the partition should be auto created. Partition creation being an atomic action, multiple clients can race to create the partition, but only one will succeed, so streaming clients do not have to synchronize when creating a partition. The ‘proxyUser’ argument indicates the user to be impersonated when talking to Hive and HDFS in all subsequent actions performed on the connection. If left null, impersonation will be disabled and all actions are performed as the user of the running process. For impersonation to work, hive-site.xml needs to be configured to allow the user of the streaming client to impersonate the indicated user.

Transactions are implemented slightly differently than traditional database systems. Each transaction has an id and multiple transactions are grouped into a “Transaction Batch”.  After connection, a streaming client first requests a new batch of transactions. In response it receives a set of Transaction Ids that are part of the transaction batch. Subsequently the client proceeds to consume one transaction id at a time by initiating new Transactions. The client will write() one or more records per transaction and either commits or aborts the current transaction before switching to the next one. Each TransactionBatch.write() invocation automatically associates the I/O attempt with the current Txn ID.  The user of the streaming client, or the proxy user if indicated, needs to have write permissions to the partition or table.

Concurrency Note: I/O can be performed on multiple TransactionBatches concurrently. However the transactions within a transaction batch must be consumed sequentially.

See the Javadoc for HiveEndPoint for more information.  Generally a user will establish a connection to the end point with HiveEndPoint and then call newConnection to get a StreamingConnection.

StreamingConnection

The StreamingConnection class is used to acquire batches of transactions.  Once the connection has been provided by HiveEndPoint the application will generally enter a loop where it calls fetchTransactionBatch and writes a series of transactions.  When closing down, the application should call close.  See the Javadoc for more information.  

TransactionBatch

TransactionBatch is used to write a series of transactions.  For each transaction, the application calls beginNextTransactionwrite, and then commit or abort as appropriate. See the Javadoc for details.

Notes about the HiveConf object

HiveEndPoint.newConnection() accepts a HiveConf argument. This can either be set to null, or a pre-created HiveConf object can be provided. If this is null, a HiveConf object will be created internally and used for the connection. When a HiveConf object is instantiated, if the directory containing the hive-site.xml is part of the java classpath, then the HiveConf object will be initialized with values from it. If no hive-site.xml is found, then the object will be initialized with defaults. Pre-creating this object and reusing it across multiple connections may have a noticeable impact on performance if connections are being opened very frequently (for e.g. several times a second). Secure connection relies on 'hive.metastore.kerberos.principal' being set correctly in the HiveConf object.

Regardless of what values are set in hive-site.xml or custom HiveConf, the API will internally override some settings in it to ensure correct streaming behavior. The below is the list of settings that are overridden:

  • hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
  • hive.support.concurrency = true
  • hive.metastore.execute.setugi = true
  • hive.execution.engine = mr

I/O – Writing Data

These classes and interfaces provide support for writing the data to Hive within a transaction.

RecordWriter

RecordWriter is the base interface implemented by all Writers. A Writer is responsible for taking a record in the form of a byte[] containing data in a known format (such as CSV) and writing it out in the format supported by Hive streaming. A RecordWriter may reorder or drop fields from the incoming record if necessary to map them to the corresponding columns in the Hive Table.  A streaming client will instantiate an appropriate RecordWriter type and pass it to TransactionBatch. The streaming client does not directly interact with RecordWriter therafter. The TransactionBatch will thereafter use and manage the RecordWriter instance to perform I/O.  See the Javadoc for details.

A RecordWriter has two primary functions.

  1. Modify input record: This may involve dropping fields from input data if they don’t have corresponding table columns, adding nulls in case of missing fields for certain columns, and changing the order of incoming fields to match the order of fields in the table. This task requires understanding of incoming data format.
  2. Encode modified record: The encoding often involves serialization using an appropriate Hive SerDe. This task is agnostic of the incoming data format.

DelimitedInputWriter

Class DelimitedInputWriter provides support for writing out input data that is in delimited formats (such as CSV).  Class DelimitedInputWriter only implements the input format specific task of modifying the input record.  See the Javadoc for details.

AbstractLazySimpleRecordWriter

The input format agnostic task of encoding the modified record is delegated to its abstract base class AbstractLazySimpleRecordWriter. This class provides the bulk of the RecordWriter implementation. It internally uses a LazySimpleSerde to transform the incoming byte[] into an Object required by the underlying writer APIs.

Example – Non-secure Mode

///// Stream five records in two transactions /////
 
// Assumed HIVE table Schema:
create table alerts ( id int , msg string )
     partitioned by (continent string, country string)
     clustered by (id) into 5 buckets
     stored as orc; // currently ORC is required for streaming
 
 
//-------   MAIN THREAD  ------- //
String dbName = "testing";
String tblName = "alerts";
ArrayList<String> partitionVals = new ArrayList<String>(2);
partitionVals.add("Asia");
partitionVals.add("India");
String serdeClass = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
 
HiveEndPoint hiveEP = new HiveEndPoint("thrift://x.y.com:9083", dbName, tblName, partitionVals);


.. spin up threads ..


//-------   Thread 1  -------//
StreamingConnection connection = hiveEP.newConnection(null, true);
DelimitedInputWriter writer =
                     new DelimitedInputWriter(fieldNames,",", endPt);
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 1 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();


if(txnBatch.remainingTransactions() > 0) {
///// Batch 1 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("3,Roshan Naik".getBytes());
txnBatch.write("4,Alan Gates".getBytes());
txnBatch.write("5,Owen O’Malley".getBytes());
txnBatch.commit();


txnBatch.close();
connection.close();
}


txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 2 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("6,David Schorow".getBytes());
txnBatch.write("7,Sushant Sowmyan".getBytes());
txnBatch.commit();


if(txnBatch.remainingTransactions() > 0) {
///// Batch 2 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("8,Ashutosh Chauhan".getBytes());
txnBatch.write("9,Thejas Nair" getBytes());
txnBatch.commit();


txnBatch.close();
}


connection.close();


//-------   Thread 2  -------//


StreamingConnection connection2 = hiveEP.newConnection(null, true);
DelimitedInputWriter writer2 =
                     new DelimitedInputWriter(fieldNames,",", endPt);
TransactionBatch txnBatch2= connection.fetchTransactionBatch(10, writer2);


///// Batch 1 - First TXN
txnBatch2.beginNextTransaction();
txnBatch2.write("21,Venkat Ranganathan".getBytes());
txnBatch2.write("22,Bowen Zhang".getBytes());
txnBatch2.commit();


///// Batch 1 - Second TXN
txnBatch2.beginNextTransaction();
txnBatch2.write("23,Venkatesh Seetaram".getBytes());
txnBatch2.write("24,Deepesh Khandelwal".getBytes());
txnBatch2.commit();


txnBatch2.close();
connection.close();



txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 2 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("26,David Schorow".getBytes());
txnBatch.write("27,Sushant Sowmyan".getBytes());
txnBatch.commit();


txnBatch2.close();
connection2.close();

Example – Secure Streaming

To connect via Kerberos to a secure Hive metastore, a UserGroupInformation (UGI) object is required. This UGI object must be acquired externally and passed as argument to the EndPoint.newConnection. All subsequent internal operations carried out using that connection object, such as acquiring transaction batch, writes and commits, will be will be automatically wrapped internally in a ugi.doAs block as necessary.

Important:  To connect using Kerberos, the 'authenticatedUser' argument to EndPoint.newConnection() should have been used to do a Kerberos login.  Additionally the 'hive.metastore.kerberos.principal' setting should be set correctly either in hive-site.xml or in the 'conf' argument (if not null). If using hive-site.xml, its directory should be included in the classpath.

 

import org.apache.hadoop.security.UserGroupInformation;

HiveEndPoint hiveEP2 = ... ;
UserGroupInformation ugi = .. authenticateWithKerberos(principal,keytab);
StreamingConnection secureConn = hiveEP2.newConnection(true, null, ugi);

DelimitedInputWriter writer3 = new DelimitedInputWriter(fieldNames, ",", hiveEP2);

TransactionBatch txnBatch3= secureConn.fetchTransactionBatch(10, writer3);

///// Batch 1 - First TXN – over secure connection
txnBatch3.beginNextTransaction();
txnBatch3.write("28,Eric Baldeschwieler".getBytes());
txnBatch3.write("29,Ari Zilka".getBytes());
txnBatch3.commit();

txnBatch3.close();
secureConn.close();
  • No labels