You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

Hive HCatalog Streaming API 

Traditionally adding new data into Hive requires gathering a large amount of data onto HDFS and then periodically adding a new partition. This is essentially a “batch insertion”. Insertion of new data into an existing partition is not permitted. Hive Streaming API allows data to be pumped continuously into Hive. The incoming data can be continuously committed in small batches of records into an existing Hive partition or table. Once data is committed it becomes immediately visible to all Hive queries initiated subsequently.

This API is intended for streaming clients such as Flume and Storm, which continuously generate data. Streaming support is built on top of ACID based insert/update support in Hive (see Hive Transactions).

The Classes and interfaces part of the Hive streaming API are broadly categorized into two sets. The first set provides support for connection and transaction management while the second set provides I/O support. Transactions are managed by the metastore. Writes are performed directly to HDFS.

Streaming to unpartitioned tables is also supported. The API supports Kerberos authentication starting Hive 0.14.

Note on packaging: The APIs are defined in the Java package org.apache.hive.hcatalog.streaming and part of the hive-hcatalog-streaming Maven module in Hive.

Streaming Requirements: 

A few of things are required to use streaming.

  1. Following settings are required in hive-site.xml to enable ACID support for streaming:
    1. hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
    2. hive.compactor.initiator.on = true
    3. hive.compactor.worker.threads > 0 
  2. “stored as orc” must be specified during table creation. Only ORC storage format is supported currently.
  3. The Hive table must be bucketed, but not sorted. So something like “clustered by (colName) into 10 buckets” must be specified during table creation. The number of buckets is ideally the same as the number of streaming writers.
  4. User of the client streaming process must have the necessary permissions to write to the table or partition and create partitions in the table.
  5.  When issuing MapReduce queries on streaming tables, the user must set hive.input.format to org.apache.hadoop.hive.ql.io.HiveInputFormat
  6. (Temporary requirements) When issuing queries on streaming tables, the client needs to set
    1. hive.vectorized.execution.enabled  to  false

    2. hive.input.format  to  org.apache.hadoop.hive.ql.io.HiveInputFormat

Limitations:

Out of the box, currently, the streaming API only provides support for streaming delimited input data (such as CSV, tab separated etc) and JSON (strict syntax) formatted data. Support for other input formats can be provided by additional implementations of the RecordWriter interface.

API Usage

Transaction and Connection Management

HiveEndPoint

The class HiveEndPoint describes a Hive End Point to connect to. This describes the database, table and partition names. Invoking the newConnection method on it establishes a connection to the Hive MetaStore for streaming purposes. It returns a StreamingConnection object. Multiple connections can be established on the same endpoint. StreamingConnection can then be used to initiate new transactions for performing I/O.

It is very likely that in a setup where data is being streamed continuously the data is added into new partitions periodically. Either the Hive admin can pre-create the necessary partitions or the streaming clients can create them as needed. HiveEndPoint.newConnection() accepts a boolean argument to indicate whether the partition should be auto created. Partition creation being an atomic action, multiple clients can race to create the partition, but only one will succeed, so streaming clients do not have to synchronize when creating a partition. The ‘proxyUser’ argument indicates the user to be impersonated when talking to Hive and HDFS in all subsequent actions performed on the connection. If left null, impersonation will be disabled and all actions are performed as the user of the running process. For impersonation to work, hive-site.xml needs to be configured to allow the user of the streaming client to impersonate the indicated user.

Transactions are implemented slightly differently than traditional database systems. Each transaction has an id and multiple transactions are grouped into a “Transaction Batch”.  After connection, a streaming client first requests a new batch of transactions. In response it receives a set of Transaction Ids that are part of the transaction batch. Subsequently the client proceeds to consume one transaction id at a time by initiating new Transactions. The client will write() one or more records per transaction and either commits or aborts the current transaction before switching to the next one. Each TransactionBatch.write() invocation automatically associates the I/O attempt with the current Txn ID.  The user of the streaming client, or the proxy user if indicated, needs to have write permissions to the partition or table.

Concurrency Note: I/O can be performed on multiple TransactionBatches concurrently. However the transactions within a transaction batch must be consumed sequentially.

See the Javadoc for HiveEndPoint for more information.  Generally a user will establish a connection to the end point with HiveEndPoint and then call newConnection to get a StreamingConnection.

StreamingConnection

The StreamingConnection class is used to acquire batches of transactions.  Once the connection has been provided by HiveEndPoint the application will generally enter a loop where it calls fetchTransactionBatch and writes a series of transactions.  When closing down, the application should call close.  See the Javadoc for more information.  

TransactionBatch

TransactionBatch is used to write a series of transactions.  For each transaction, the application calls beginNextTransactionwrite, and then commit or abort as appropriate. See the Javadoc for details.

I/O – Writing Data

These classes and interfaces provide support for writing the data to Hive within a transaction.

RecordWriter

RecordWriter is the base interface implemented by all Writers. A Writer is responsible for taking a record in the form of a byte[] containing data in a known format (such as CSV) and writing it out in the format supported by Hive streaming. A RecordWriter may reorder or drop fields from the incoming record if necessary to map them to the corresponding columns in the Hive Table.  A streaming client will instantiate an appropriate RecordWriter type and pass it to TransactionBatch. The streaming client does not directly interact with RecordWriter therafter. The TransactionBatch will thereafter use and manage the RecordWriter instance to perform I/O.  See the Javadoc for details.

A RecordWriter has two primary functions.

  1. Modify input record: This may involve dropping fields from input data if they don’t have corresponding table columns, adding nulls in case of missing fields for certain columns, and changing the order of incoming fields to match the order of fields in the table. This task requires understanding of incoming data format.
  2. Encode modified record: The encoding often involves serialization using an appropriate Hive SerDe. This task is agnostic of the incoming data format.

DelimitedInputWriter

Class DelimitedInputWriter provides support for writing out input data that is in delimited formats (such as CSV).  Class DelimitedInputWriter only implements the input format specific task of modifying the input record.  See the Javadoc for details.

AbstractLazySimpleRecordWriter

The input format agnostic task of encoding the modified record is delegated to its abstract base class AbstractLazySimpleRecordWriter. This class provides the bulk of the RecordWriter implementation. It internally uses a LazySimpleSerde to transform the incoming byte[] into an Object required by the underlying writer APIs.

Example - Non-secure mode

///// Stream five records in two transactions /////
 
// Assumed HIVE table Schema:
create table alerts ( id int , msg string )
     partitioned by (continent string, country string)
     clustered by (id) into 5 buckets
     stored as orc; // currently ORC is required for streaming
 
 
//-------   MAIN THREAD  ------- //
String dbName = "testing";
String tblName = "alerts";
ArrayList<String> partitionVals = new ArrayList<String>(2);
partitionVals.add("Asia");
partitionVals.add("India");
String serdeClass = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
 
HiveEndPoint hiveEP = new HiveEndPoint("thrift://x.y.com:9083", dbName, tblName, partitionVals);


.. spin up threads ..


//-------   Thread 1  -------//
StreamingConnection connection = hiveEP.newConnection(null, true);
DelimitedInputWriter writer =
                     new DelimitedInputWriter(fieldNames,",", endPt);
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 1 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();


if(txnBatch.remainingTransactions() > 0) {
///// Batch 1 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("3,Roshan Naik".getBytes());
txnBatch.write("4,Alan Gates".getBytes());
txnBatch.write("5,Owen O’Malley".getBytes());
txnBatch.commit();


txnBatch.close();
connection.close();
}


txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 2 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("6,David Schorow".getBytes());
txnBatch.write("7,Sushant Sowmyan".getBytes());
txnBatch.commit();


if(txnBatch.remainingTransactions() > 0) {
///// Batch 2 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("8,Ashutosh Chauhan".getBytes());
txnBatch.write("9,Thejas Nair" getBytes());
txnBatch.commit();


txnBatch.close();
}


connection.close();


//-------   Thread 2  -------//


StreamingConnection connection2 = hiveEP.newConnection(null, true);
DelimitedInputWriter writer2 =
                     new DelimitedInputWriter(fieldNames,",", endPt);
TransactionBatch txnBatch2= connection.fetchTransactionBatch(10, writer2);


///// Batch 1 - First TXN
txnBatch2.beginNextTransaction();
txnBatch2.write("21,Venkat Ranganathan".getBytes());
txnBatch2.write("22,Bowen Zhang".getBytes());
txnBatch2.commit();


///// Batch 1 - Second TXN
txnBatch2.beginNextTransaction();
txnBatch2.write("23,Venkatesh Seetaram".getBytes());
txnBatch2.write("24,Deepesh Khandelwal".getBytes());
txnBatch2.commit();


txnBatch2.close();
connection.close();




txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 2 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("26,David Schorow".getBytes());
txnBatch.write("27,Sushant Sowmyan".getBytes());
txnBatch.commit();




txnBatch2.close();


connection2.close();

 

Example - Secure Streaming

 

To connect via Kerberos to a secure hive metastore an UserGroupInformation (UGI) object is required. This UGI object must be acquired externally and passed as argument to the EndPoint.newConnection. All subsequent internal operations carried out using that connection object, such as acquiring transaction batch, writes and commits, will be will be automatically wrapped internally in a ugi.doAs block as necessary.

 

import  org.apache.hadoop.security.UserGroupInformation;

HiveEndPoint hiveEP2 = ... ;
UserGroupInformation ugi = .. authenticateWithKerberos(principal,keytab);
StreamingConnection secureConn = hiveEP2.newConnection(true, null, ugi);

DelimitedInputWriter writer3 = new DelimitedInputWriter(fieldNames, ",", hiveEP2);

TransactionBatch txnBatch3= secureConn.fetchTransactionBatch(10, writer3);

///// Batch 1 - First TXN – over secure connection
txnBatch3.beginNextTransaction();
txnBatch3.write("28,Eric Baldeschwieler".getBytes());
txnBatch3.write("29,Ari Zilka".getBytes());
txnBatch3.commit();

txnBatch3.close();
secureConn.close();
  • No labels