Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Hive 3 Streaming API

Hive 3 Streaming API Documentation - new API available in Hive 3

Hive HCatalog Streaming API API

Traditionally adding new data into Hive requires gathering a large amount of data onto HDFS and then periodically adding a new partition. This is essentially a “batch insertion”. Insertion of new data into an existing partition is not permitted. Hive Streaming API allows data to be pumped continuously into Hive. The incoming data can be continuously committed in small batches of records into an existing Hive partition or table. Once data is committed it becomes immediately visible to all Hive queries initiated subsequently.

...

Note on packaging: The APIs are defined in the Java package org.apache.hive.hcatalog.streaming and part of the hive-hcatalog-streaming Maven module in Hive.

Streaming

...

Mutation API

Starting in release 2.0.0, Hive offers another API for mutating (insert/update/delete) records into transactional tables using Hive’s ACID feature. See HCatalog Streaming Mutation API for details and a comparison with the streaming data ingest API that is described in this document.

Streaming Requirements

A few things are required to use streaming.

  1. The following settings are required in hive-site.xml to enable ACID support for streaming:
    1. hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager

A few things are required to use streaming.

  1. The following settings are required in hive-site.xml to enable ACID support for streaming:
    1. hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
    2. hive.compactor.initiator.on = truetrue (See more important details here)
    3. hive.compactor.worker.threadscleaner.on = true (From Hive 4.0.0 onwards. See more important details here)
    4. hive.compactor.worker.threads > 0 
  2. “stored as orc” must be specified during table creation. Only ORC storage format is supported currently.
  3. tblproperties("transactional"="true") must be set on the table during creation.
  4. The Hive table must be bucketed, but not sorted. So something like “clustered by (colName) into 10 buckets” must be specified during table creation. The number of buckets is ideally the same as the number of streaming writers.
  5. User of the client streaming process must have the necessary permissions to write to the table or partition and create partitions in the table.
  6. (Temporary requirements) When issuing MapReduce queries on streaming tables, the user must set hive.input.format to org.apache.hadoop.hive.ql.io.HiveInputFormat.client needs to set
    1. hive.vectorized.

    (Temporary requirements) When issuing queries on streaming tables, the client needs to set
    1. hive.vectorized.execution.enabled  to  false     (for Hive version < 0.14.0)

    2. hive.input.format  to  org.apache.hadoop.hive.ql.io.HiveInputFormat

...

Out of the box, currently, the streaming API only provides support for streaming delimited input data (such as CSV, tab separated, etc.) and JSON (strict syntax) formatted data. Support for other input formats can be provided by additional implementations of the RecordWriter interface.

Only Currently only ORC is currently supported for the format of the destination table.

API Usage

Transaction and Connection Management

...

The class HiveEndPoint describes a Hive End Point to connect to. This describes the database, table and partition names. Invoking the newConnection method on it establishes a connection to the Hive MetaStore for streaming purposes. It returns a StreamingConnection object. Multiple connections can be established on the same endpoint. StreamingConnection can then be used to initiate new transactions for performing I/O.

...

Transactions are implemented slightly differently than traditional database systems. Each transaction has an id and multiple transactions are grouped into a “Transaction Batch”.   After connection, a This helps grouping records from multiple transactions into fewer files (rather than 1 file per transaction). After connection, a streaming client first requests a new batch of transactions. In response it receives a set of Transaction Ids that are part of the transaction batch. Subsequently the client proceeds to consume one transaction id at a time by initiating new Transactions. The client will write() one or more records per transaction and either commits or aborts the current transaction before switching to the next one. Each TransactionBatch.write() invocation automatically associates the I/O attempt with the current Txn ID.  The The user of the streaming client , or the proxy user if indicated, needs process, needs to have write permissions to the partition or table. Kerberos based authentication is required to acquire connections as a specific user. See secure streaming example below.

Concurrency Note: I/O can be performed on multiple TransactionBatches concurrently. However the transactions within a transaction batch must be consumed sequentially.

See the Javadoc for HiveEndPoint for more information.  Generally a user will establish a connection to the end point the destination info with HiveEndPoint object and then call calls newConnection to make a connection and get back a StreamingConnection object.

StreamingConnection

The StreamingConnection class is used to acquire batches of transactions.  Once the connection has been provided by HiveEndPoint the application will generally enter a loop where it calls fetchTransactionBatch and writes a series of transactions.  When closing down, the application should call close.  See the Javadoc for more information.  

TransactionBatch

TransactionBatch is used to write a series of transactions.  For each transaction, the application calls beginNextTransactionwrite, and then commit or abort as appropriate. See the Javadoc for details.

Notes about the HiveConf Object

HiveEndPoint.newConnection() accepts a HiveConf argument. This can either be set to null, or a pre-created HiveConf object can be provided. If this is null, a HiveConf object will be created internally and used for the connection. When a HiveConf object is instantiated, if the directory containing the hive-site.xml is part of the java classpath, then the HiveConf object will be initialized with values from it. If no hive-site.xml is found, then the object will be initialized with defaults. Pre-creating this object and reusing it across multiple connections may have a noticeable impact on performance if connections are being opened very frequently (for example several times a second). Secure connection relies on 'hive.metastore.kerberos.principal' being set correctly in the HiveConf object.

Regardless of what values are set in hive-site.xml or custom HiveConf, the API will internally override some settings in it to ensure correct streaming behavior. The below is the list of settings that are overridden:

  • hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
  • hive.support.concurrency = true
  • hive.metastore.execute.setugi = true
  • hive.execution.engine = mr

I/O – Writing Data

These classes and interfaces provide support for writing the data to Hive within a transaction.

RecordWriter

RecordWriter is the base interface implemented by all Writers. A Writer is responsible for taking a record in the form of a byte[] containing data in a known format (such as CSV) and writing it out in the format supported by Hive streaming. A RecordWriter may reorder or drop fields from the incoming record if necessary to map them to the corresponding columns in the Hive Table.  A streaming client will instantiate an appropriate RecordWriter type and pass it to TransactionBatch. The streaming client does not directly interact with RecordWriter therafter. The TransactionBatch will thereafter use and manage the RecordWriter instance to perform I/O.  See the Javadoc for details.

A RecordWriter has two primary functions.

  1. Modify input record: This may involve dropping fields from input data if they don’t have corresponding table columns, adding nulls in case of missing fields for certain columns, and changing the order of incoming fields to match the order of fields in the table. This task requires understanding of incoming data format.
  2. Encode modified record: The encoding often involves serialization using an appropriate Hive SerDe. This task is agnostic of the incoming data format.

DelimitedInputWriter

Class DelimitedInputWriter provides support for writing out input data that is in delimited formats (such as CSV).  Class DelimitedInputWriter only implements the input format specific task of modifying the input record.  See the Javadoc for details.

AbstractLazySimpleRecordWriter

There is one file created on HDFS per TxnBatch in each bucket. The API examines each record to decide which bucket it belongs to and writes it to the appropriate bucket. If the table has 5 buckets, there will be 5 files (some of them could be empty) for the TxnBatch (before compaction kicks in).  Prior to Hive 1.3.0, a bug in the API's bucket computation logic caused incorrect distribution of records into buckets, which could lead to incorrect data returned from queries using bucket join algorithms.

For each transaction in the TxnBatch, the application calls beginNextTransactionwrite, and then commit or abort as appropriate. See the Javadoc for details.  A Transaction cannot include data from more than one partition.

Transactions in a TransactionBatch are eventually expired by the Metastore if not committed or aborted after hive.txn.timeout secs. TrasnactionBatch class provides a heartbeat() method to prolong the lifetime of unused transactions in the batch.  A good rule of thumb is to send call heartbeat() at (hive.txn.timeout/2) intervals after creating a TransactionBatch.  This is sufficient to keep an inactive transaction alive but not load the metastore unnecessarily.

Usage Guidelines

Generally, the more events are included in each transaction the more throughput can be achieved.  It's common commit either after a certain number of events or after a certain time interval, whichever comes first.  The later ensures that when event flow rate is variable, transactions don't stay open too long.  There is no practical limit on how much data can be included in a single transaction.  The only concern is amount of data which will need to be replayed if the transaction fails.  The concept of a TransactionBatch serves to reduce the number of files created by SteramingAPI in HDFS.  Since all transactions in a given batch write to the same physical file (per bucket), a partition can only be compacted up to the the level of the earliest transaction of any batch which contains an open transaction.  Thus TranactionBatches should not be made excessively large.  It makes sense to include a timer to close a TransactionBatch (even if it has unused transactions) after some amount of time.

Note: Hive 1.3.0 onwards, invoking TxnBatch.close() will cause all unused transaction in the current TxnBatch to be aborted.

Notes about the HiveConf Object

HiveEndPoint.newConnection() accepts a HiveConf argument. This can either be set to null, or a pre-created HiveConf object can be provided. If this is null, a HiveConf object will be created internally and used for the connection. When a HiveConf object is instantiated, if the directory containing the hive-site.xml is part of the java classpath, then the HiveConf object will be initialized with values from it. If no hive-site.xml is found, then the object will be initialized with defaults. Pre-creating this object and reusing it across multiple connections may have a noticeable impact on performance if connections are being opened very frequently (for example several times a second). Secure connection relies on 'hive.metastore.kerberos.principal' being set correctly in the HiveConf object.

Regardless of what values are set in hive-site.xml or custom HiveConf, the API will internally override some settings in it to ensure correct streaming behavior. The below is the list of settings that are overridden:

  • hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
  • hive.support.concurrency = true
  • hive.metastore.execute.setugi = true
  • hive.execution.engine = mr

I/O – Writing Data

These classes and interfaces provide support for writing the data to Hive within a transaction.

RecordWriter

RecordWriter is the base interface implemented by all Writers. A Writer is responsible for taking a record in the form of a byte[] containing data in a known format (such as CSV) and writing it out in the format supported by Hive streaming. A RecordWriter may reorder or drop fields from the incoming record if necessary to map them to the corresponding columns in the Hive Table.  A streaming client will instantiate an appropriate RecordWriter type and pass it to TransactionBatch. The streaming client does not directly interact with RecordWriter therafter. The TransactionBatch will thereafter use and manage the RecordWriter instance to perform I/O.  See the Javadoc for details.

A RecordWriter's primary functions are:

  1. Modify input record: This may involve dropping fields from input data if they don’t have corresponding table columns, adding nulls in case of missing fields for certain columns, and changing the order of incoming fields to match the order of fields in the table. This task requires understanding of incoming data format. Not all formats (for example JSON, which includes field names in the data) need this step.
  2. Encode modified record: The encoding involves serialization using an appropriate Hive SerDe.
  3. Identify the bucket to which the record belongs
  4. Write encoded record to Hive using the AcidOutputFormat's record updater for the appropriate bucket.

DelimitedInputWriter

Class DelimitedInputWriter implements the RecordWriter interface. It accepts input records that in delimited formats (such as CSV) and writes them to Hive. It reorders the fields if needed, and converts the record into an Object using LazySimpleSerde, which is then passed on to the underlying AcidOutputFormat's record updater for the appropriate bucket.  See Javadoc.

StrictJsonWriter

Class StrictJsonWriter  implements the RecordWriter interface. It accepts input records that in strict JSON format and writes them to Hive. It converts the JSON record directly into an Object using JsonSerde, which is then passed on to the underlying AcidOutputFormat's record updater for the appropriate bucket.  See Javadoc.

StrictRegexWriter

Class StrictRegexWriter implements the RecordWriter interface. It accepts input records, regex that in text format and writes them to Hive. It converts the text record using proper regex directly into an Object using RegexSerDe, which is then passed on to the underlying AcidOutputFormat's record updater for the appropriate bucket. See Javadoc.  Available in Hive 1.2.2+ and 2.3.0+.

AbstractRecordWriter

This is a base class that contains some of the common code needed by RecordWriter objects such as schema lookup and computing the bucket into which a record should belong.

Error Handling

It's imperative for proper functioning of the system that the client of this API handle errors correctly.  Once a TransactionBatch is obtained, if any exception is thrown from TransactionBatch (except SerializationError) should cause the client to call TransactionBatch.abort() to abort current transaction and then TransactionBatch.close() and start a new batch to write more data and/or redo the work of the last transaction during which the failure occurred.  Not following this may, in rare cases, cause file corruption.  Furthermore, StreamingException should ideally cause the client to perform exponential back off before starting new batch.  This will help the cluster stabilize since the most likely reason for these failures is HDFS overload.

SerializationError indicates that a given tuple could not be parsed.  The client may choose to throw away such tuples or send them to a dead letter queue.  After seeing this exception, more data can be written to the current transaction and further transactions in the same TransactionBatchThe input format agnostic task of encoding the modified record is delegated to its abstract base class AbstractLazySimpleRecordWriter. This class provides the bulk of the RecordWriter implementation. It internally uses a LazySimpleSerDe to transform the incoming byte[] into an Object required by the underlying writer APIs.

Example – Non-secure Mode

Code Block
languagetext
///// Stream five records in two transactions /////
 
// Assumed HIVE table Schema:
create table alerts ( id int , msg string )
     partitioned by (continent string, country string)
     clustered by (id) into 5 buckets
     stored as orc tblproperties("transactional"="true"); // currently ORC is required for streaming
 
 
//-------   MAIN THREAD  ------- //
String dbName = "testing";
String tblName = "alerts";
ArrayList<String> partitionVals = new ArrayList<String>(2);
partitionVals.add("Asia");
partitionVals.add("India");
String serdeClass = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
 
HiveEndPoint hiveEP = new HiveEndPoint("thrift://x.y.com:9083", dbName, tblName, partitionVals);


.. spin up threads ..


//-------   Thread 1  -------//
StreamingConnection connection = hiveEP.newConnection(null, true);
DelimitedInputWriter writer =
                     new DelimitedInputWriter(fieldNames,",", endPthiveEP);
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 1 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();


if(txnBatch.remainingTransactions() > 0) {
///// Batch 1 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("3,Roshan Naik".getBytes());
txnBatch.write("4,Alan Gates".getBytes());
txnBatch.write("5,Owen O’Malley".getBytes());
txnBatch.commit();


txnBatch.close();
connection.close();
}


txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 2 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("6,David Schorow".getBytes());
txnBatch.write("7,Sushant Sowmyan".getBytes());
txnBatch.commit();


if(txnBatch.remainingTransactions() > 0) {
///// Batch 2 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("8,Ashutosh Chauhan".getBytes());
txnBatch.write("9,Thejas Nair" getBytes());
txnBatch.commit();


txnBatch.close();
}


connection.close();


//-------   Thread 2  -------//


StreamingConnection connection2 = hiveEP.newConnection(null, true);
DelimitedInputWriter writer2 =
                     new DelimitedInputWriter(fieldNames,",", endPthiveEP);
TransactionBatch txnBatch2= connection.fetchTransactionBatch(10, writer2);


///// Batch 1 - First TXN
txnBatch2.beginNextTransaction();
txnBatch2.write("21,Venkat Ranganathan".getBytes());
txnBatch2.write("22,Bowen Zhang".getBytes());
txnBatch2.commit();


///// Batch 1 - Second TXN
txnBatch2.beginNextTransaction();
txnBatch2.write("23,Venkatesh Seetaram".getBytes());
txnBatch2.write("24,Deepesh Khandelwal".getBytes());
txnBatch2.commit();


txnBatch2.close();
connection.close();



txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 2 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("26,David Schorow".getBytes());
txnBatch.write("27,Sushant Sowmyan".getBytes());
txnBatch.commit();


txnBatch2.close();
connection2.close();

...

Important:  To connect using Kerberos, the 'authenticatedUser' argument to EndPoint.newConnection() should have been used to do a Kerberos login.  Additionally the 'hive.metastore.kerberos.principal' setting should be set correctly either in hive-site.xml or in the 'conf' argument (if not null). If using hive-site.xml, its directory should be included in the classpath. 


Code Block
languagetext
import org.apache.hadoop.security.UserGroupInformation;

HiveEndPoint hiveEP2 = ... ;
UserGroupInformation ugi = .. authenticateWithKerberos(principal,keytab);
StreamingConnection secureConn = hiveEP2.newConnection(true, null, ugi);

DelimitedInputWriter writer3 = new DelimitedInputWriter(fieldNames, ",", hiveEP2);

TransactionBatch txnBatch3= secureConn.fetchTransactionBatch(10, writer3);

///// Batch 1 - First TXN – over secure connection
txnBatch3.beginNextTransaction();
txnBatch3.write("28,Eric Baldeschwieler".getBytes());
txnBatch3.write("29,Ari Zilka".getBytes());
txnBatch3.commit();

txnBatch3.close();
secureConn.close();

Knowledge Base