Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: edits and links (fixed javadoc links except AbstractLazySimpleRecordWriter – not found in 0.13.1 javadocs)

Table of Contents



HCatalog Streaming API 

Traditionally adding new data into Hive requires gathering a large amount of data onto HDFS and then periodically adding a new partition. This is essentially a “batch insertion”. Insertion of new data into an existing partition is not permitted. Hive Streaming API allows data to be pumped continuously into Hive. The incoming data can be continuously committed in small batches of records into an existing Hive partition or table. Once data is committed it becomes immediately visible to all Hive queries initiated subsequently.

This API is intended for streaming clients such as Flume and Storm, which continuously generate data. Streaming support is built on top of ACID based insert/update support in Hive (see Hive Transactions).

The Classes and interfaces part of the Hive streaming API are broadly categorized into two sets. The first set provides support for connection and transaction management while the second set provides I/O support. Transactions are managed by the metastore. Writes are performed directly to HDFS.

Note on packaging: The APIs are defined in the Java package org.apache.hive.hcatalog.streaming and part of the hive-hcatalog-streaming maven  Maven module in hiveHive.


A couple of things are currently required to use streaming.

1) Only ORC storage format is supported currently. So “stored as orc” must be specified during table creation.

2) The hive Hive table must be bucketed, but not sorted. So something like “clustered by (colName) into 10 buckets” must be specified during table creation. The number of buckets is ideally the same as the number of streaming writers.


The class HiveEndPoint describes a Hive End Point to connect to. This describes the database, table and partition names.   Invoking the newConnection method on it establishes a connection to the Hive MetaStore for streaming purposes. It returns a StreamingConnection object. Multiple connections can be established on the same endpoint. StreamingConnection can then be used to initiate new transactions for performing I/O.

It is very likely that in a setup where data is being streamed continuously the data is added into new partitions periodically. Either the Hive admin can pre-create the necessary partitions or have the streaming clients can create them as needed. HiveEndPoingHiveEndPoint.newConnection() accepts a boolean argument to indicate whether the partition should be auto created. Partition creation being an atomic action, multiple clients can race to create the partition, but only one will succeed, so streaming clients do not have to synchronize when creating a partition. The ‘proxyUser’ argument indicates the user to be impersonated when talking to Hive and HDFS in all subsequent actions performed on the connection. If left null, impersonation will be disabled and all actions are performed as the user of the running process. For impersonation to work, hive-site.xml needs to be configured to allow the user of the streaming client to impersonate the indicated user.

Transactions are implemented slightly differently than traditional database systems. Each transaction has an id and multiple transactions are grouped into a “Transaction Batch”.  After connection, a streaming client first requests for a new batch of transactions. In response it receives a set of Transaction Ids that are part of the transaction batch. Subsequently the client proceeds to consume one transaction id at a time by initiating new Transactions. The client will write() one or more records per transactions transaction and either commits or aborts the current transaction before switching to the next one. Each TransactionBatch.write() invocation automatically associates the I/O attempt with the current Txn ID.  The user of the streaming client, or the proxy user if indicated, needs to have write permissions to the partition or table.

Concurrency Note: I/O can be performed on multiple TransactionBatches concurrently. However the transactions within a transaction batch much must be consumed sequentially.

See javadoc for HiveEndpoint See HiveEndPoint in the Javadoc for more information.  Generally a user will establish a connection to the end point with HiveEndPoint and then call newConnection to get a StreamingConnection.


The StreamingConnection class is used to acquire batches of transactions.  Once the connection has been provided by HiveEndPoint the application will generally enter a loop where it calls fetchTransactionBatch and writes a series of transactions.  When closing down, the application should call close.  See javadoc  See StreamingConnection in the Javadoc for more information.  


TransactionBatch is used to write a series of transactions.  For each transactionstransaction, the application calls beginNextTransactionwrite, and then commit or abort as appropriate.  See javadoc See TransactionBatch in the Javadoc for details.

I/O – Writing Data

These classes and interfaces provide support for writing the data to Hive within a transaction.


RecordWriter is the base interface implemented by all Writers. A Writer is responsible for taking a record in the form of a byte[] containing data in a known format (e.g. such as CSV) and writing it out in the format supported by Hive streaming. A RecordWriter may reorder or drop fields from the incoming record if necessary to map them to the corresponding columns in the Hive Table.  A streaming client will instantiate an appropriate RecordWriter type and pass it to TransactionBatch. The streaming client does not directly interact with RecordWriter therafter. The TransactionBatch will thereafter use and manage the RecordWriter instance to perform I/O.  See javadoc  See RecordWriter in the Javadoc for details.

A RecordWriter has two primary functions.

  1. Modify input record: This may involve :  dropping dropping fields from input data if they don’t have corresponding table columns, adding nulls in case of missing fields for certain columns, and changing the order of incoming fields to match the order of fields in the table. This task requires understanding of incoming data format.
  2. Encode modified record: The encoding often involves serialization using an appropriate Hive SerdeSerDe. This task is agnostic of the incoming data format.


Class DelimitedInputWriter provides support for writing out input data that is in delimited formats (such as CSV).  Class DelimitedInputWriter only implements the input format specific task of modifying the input record.  See javadoc  See DelimitedInputWriter in the Javadoc for details.


The input format agnostic task of encoding the modified record is delegated to its abstract base class AbstractLazySimpleRecordWriter. This class provides the bulk of the RecordWriter implementation. It internally uses a LazySimpleSerde to transform the incoming byte[] into an Object required by the underlying writer APIs.


Code Block
///// Stream five records in two transactions /////



// Assumed HIVE table Schema:


create table alerts ( id int , msg string )


     partitioned by (continent string, country string)


     clustered by (id) into 5 buckets


     stored as orc; // currently ORC is required for streaming







THREAD  ------- //


String dbName = "testing";


String tblName = "alerts";


ArrayList<String> partitionVals = new ArrayList<String>(2);






String serdeClass = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";



HiveEndPoint hiveEP = new HiveEndPoint("thrift://", dbName, tblName, partitionVals);



.. spin up threads ..





   Thread 1  -------//


StreamingConnection connection = hiveEP.newConnection(null, true);


DelimitedInputWriter writer


                     new DelimitedInputWriter(fieldNames,",", endPt);


TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);



///// Batch 1 - First TXN




txnBatch.write("1,Hello streaming".getBytes());


txnBatch.write("2,Welcome to streaming".getBytes());





if(txnBatch.remainingTransactions() > 0) {


///// Batch 1 - Second TXN




txnBatch.write("3,Roshan Naik".getBytes());


txnBatch.write("4,Alan Gates".getBytes());


txnBatch.write("5,Owen O’Malley".getBytes());













txnBatch = connection.fetchTransactionBatch(10, writer);



///// Batch 2 - First TXN




txnBatch.write("6,David Schorow".getBytes());


txnBatch.write("7,Sushant Sowmyan".getBytes());





if(txnBatch.remainingTransactions() > 0) {


///// Batch 2 - Second TXN




txnBatch.write("8,Ashutosh Chauhan".getBytes());


txnBatch.write("9,Thejas Nair" getBytes());















   Thread 2  -------//




StreamingConnection connection2 = hiveEP.newConnection(null, true);


DelimitedInputWriter writer2


                     new DelimitedInputWriter(fieldNames,",", endPt);


TransactionBatch txnBatch2= connection.fetchTransactionBatch(10, writer2);



///// Batch 1 - First TXN




txnBatch2.write("21,Venkat Ranganathan".getBytes());


txnBatch2.write("22,Bowen Zhang".getBytes());





///// Batch 1 - Second TXN




txnBatch2.write("23,Venkatesh Seetaram".getBytes());


txnBatch2.write("24,Deepesh Khandelwal".getBytes());












txnBatch = connection.fetchTransactionBatch(10, writer);



///// Batch 2 - First TXN




txnBatch.write("26,David Schorow".getBytes());


txnBatch.write("27,Sushant Sowmyan".getBytes());









