Table of Contents

Hive 3 Streaming API

Hive 3 Streaming API Documentation - new API available in Hive 3

Hive HCatalog Streaming API API

Traditionally adding new data into Hive requires gathering a large amount of data onto HDFS and then periodically adding a new partition. This is essentially a “batch insertion”. Insertion of new data into an existing partition is not permitted. Hive Streaming API allows data to be pumped continuously into Hive. The incoming data can be continuously committed in small batches of records into an existing Hive partition or table. Once data is committed it becomes immediately visible to all Hive queries initiated subsequently.


Note on packaging: The APIs are defined in the Java package org.apache.hive.hcatalog.streaming and part of the hive-hcatalog-streaming Maven module in Hive.



Mutation API

Starting in release 2.0.0, Hive offers another API for mutating (insert/update/delete) records into transactional tables using Hive’s ACID feature. See HCatalog Streaming Mutation API for details and a comparison with the streaming data ingest API that is described in this document.

Streaming Requirements

A few things are required to use streaming.

    hive.compactor.initiator.on = true (See more important details here)
    hive.compactor.cleaner.on = true (From Hive 4.0.0 onwards. See more important details here)
    4. hive.compactor.worker.threads > 0 
  2. “stored as orc” must be specified during table creation. Only ORC storage format is supported currently.
  3. tblproperties("transactional"="true") must be set on the table during creation.
  4. The Hive table must be bucketed, but not sorted. So something like “clustered by (colName) into 10 buckets” must be specified during table creation. The number of buckets is ideally the same as the number of streaming writers.
  5. User of the client streaming process must have the necessary permissions to write to the table or partition and create partitions in the table.
  6. (Temporary requirements) When issuing queries on streaming tables, the client needs to set
    hive.vectorized.execution.enabled  to  false     (for Hive version < 0.14.0)

    2. hive.input.format  to


The class HiveEndPoint describes a Hive End Point to connect to. This describes the database, table and partition names. Invoking the newConnection method on it establishes a connection to the Hive MetaStore for streaming purposes. It returns a StreamingConnection object. Multiple connections can be established on the same endpoint. StreamingConnection can then be used to initiate new transactions for performing I/O.


TransactionBatch is used to write a series of transactions. There is one file created on HDFS per TxnBatch in each bucket. The API examines each record to decide which bucket it belongs to and writes it to the appropriate bucket. If the table has 5 buckets, there will be 5 files (some of them could be empty) for the TxnBatch (before compaction kicks in).  Prior to Hive 1.3.0, a bug in the API's bucket computation logic caused incorrect distribution of records into buckets, which could lead to incorrect data returned from queries using bucket join algorithms.

For each transaction in the TxnBatch, the application calls beginNextTransaction, write, and then commit or abort as appropriate. See the Javadoc for details.  A Transaction cannot include data from more than one partition.

Transactions in a TransactionBatch are eventually expired by the Metastore if not committed or aborted after hive.txn.timeout secs. TrasnactionBatch class provides a heartbeat() method to prolong the lifetime of unused transactions in the batch.  A good rule of thumb is to send call heartbeat() at (hive.txn.timeout/2) intervals after creating a TransactionBatch.  This is sufficient to keep an inactive transaction alive but not load the metastore unnecessarily.

Usage Guidelines

Generally, the more events are included in each transaction the more throughput can be achieved.  It's common commit either after a certain number of events or after a certain time interval, whichever comes first.  The later ensures that when event flow rate is variable, transactions don't stay open too long.  There is no practical limit on how much data can be included in a single transaction.  The only concern is amount of data which will need to be replayed if the transaction fails.  The concept of a TransactionBatch serves to reduce the number of files created by SteramingAPI in HDFS.  Since all transactions in a given batch write to the same physical file (per bucket), a partition can only be compacted up to the the level of the earliest transaction of any batch which contains an open transaction.  Thus TranactionBatches should not be made excessively large.  It makes sense to include a timer to close a TransactionBatch (even if it has unused transactions) after some amount of time.

Note: Hive 1.3.0 onwards, invoking TxnBatch.close() will cause all unused transaction in the current TxnBatch to be aborted.

Note: Hive 1.3.0 onwards, invoking TxnBatch.close() will cause all unused transaction in the current TxnBatch to be aborted.


Class StrictJsonWriter  implements the RecordWriter interface. It accepts input records that in strict JSON format and writes them to Hive. It converts the JSON record directly into an Object using JsonSerde, which is then passed on to the underlying AcidOutputFormat's record updater for the appropriate bucket.  See Javadoc.


Class StrictRegexWriter implements the RecordWriter interface. It accepts input records, regex and text and writes them to Hive. It converts the text record using proper regex directly into an Object using RegexSerDe, which is then passed on to the underlying AcidOutputFormat's record updater for the appropriate bucket.  See Javadoc.  Available in Hive 1.2.2+ and 2.3.0+.


This is a base class that contains some of the common code needed by RecordWriter objects such as schema lookup and computing the bucket into which a record should belong.


Code Block
///// Stream five records in two transactions /////
// Assumed HIVE table Schema:
create table alerts ( id int , msg string )
     partitioned by (continent string, country string)
     clustered by (id) into 5 buckets
     stored as orc tblproperties("transactional"="true"); // currently ORC is required for streaming
//-------   MAIN THREAD  ------- //
String dbName = "testing";
String tblName = "alerts";
ArrayList<String> partitionVals = new ArrayList<String>(2);
String serdeClass = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
HiveEndPoint hiveEP = new HiveEndPoint("thrift://", dbName, tblName, partitionVals);

.. spin up threads ..

//-------   Thread 1  -------//
StreamingConnection connection = hiveEP.newConnection(true);
DelimitedInputWriter writer =
                     new DelimitedInputWriter(fieldNames,",", hiveEP);
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);

///// Batch 1 - First TXN
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.write("2,Welcome to streaming".getBytes());

if(txnBatch.remainingTransactions() > 0) {
///// Batch 1 - Second TXN
txnBatch.write("3,Roshan Naik".getBytes());
txnBatch.write("4,Alan Gates".getBytes());
txnBatch.write("5,Owen O’Malley".getBytes());


txnBatch = connection.fetchTransactionBatch(10, writer);

///// Batch 2 - First TXN
txnBatch.write("6,David Schorow".getBytes());
txnBatch.write("7,Sushant Sowmyan".getBytes());

if(txnBatch.remainingTransactions() > 0) {
///// Batch 2 - Second TXN
txnBatch.write("8,Ashutosh Chauhan".getBytes());
txnBatch.write("9,Thejas Nair" getBytes());



//-------   Thread 2  -------//

StreamingConnection connection2 = hiveEP.newConnection(true);
DelimitedInputWriter writer2 =
                     new DelimitedInputWriter(fieldNames,",", hiveEP);
TransactionBatch txnBatch2= connection.fetchTransactionBatch(10, writer2);

///// Batch 1 - First TXN
txnBatch2.write("21,Venkat Ranganathan".getBytes());
txnBatch2.write("22,Bowen Zhang".getBytes());

///// Batch 1 - Second TXN
txnBatch2.write("23,Venkatesh Seetaram".getBytes());
txnBatch2.write("24,Deepesh Khandelwal".getBytes());


txnBatch = connection.fetchTransactionBatch(10, writer);

///// Batch 2 - First TXN
txnBatch.write("26,David Schorow".getBytes());
txnBatch.write("27,Sushant Sowmyan".getBytes());



Important:  To connect using Kerberos, the 'authenticatedUser' argument to EndPoint.newConnection() should have been used to do a Kerberos login.  Additionally the 'hive.metastore.kerberos.principal' setting should be set correctly either in hive-site.xml or in the 'conf' argument (if not null). If using hive-site.xml, its directory should be included in the classpath. 

Code Block

HiveEndPoint hiveEP2 = ... ;
UserGroupInformation ugi = .. authenticateWithKerberos(principal,keytab);
StreamingConnection secureConn = hiveEP2.newConnection(true, null, ugi);

DelimitedInputWriter writer3 = new DelimitedInputWriter(fieldNames, ",", hiveEP2);

TransactionBatch txnBatch3= secureConn.fetchTransactionBatch(10, writer3);

///// Batch 1 - First TXN – over secure connection
txnBatch3.write("28,Eric Baldeschwieler".getBytes());
txnBatch3.write("29,Ari Zilka".getBytes());


