Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Hive 3 Streaming API

Hive 3 Streaming API Documentation - new API available in Hive 3

Hive HCatalog Streaming API

Traditionally adding new data into Hive requires gathering a large amount of data onto HDFS and then periodically adding a new partition. This is essentially a “batch insertion”. Insertion of new data into an existing partition is not permitted. Hive Streaming API allows data to be pumped continuously into Hive. The incoming data can be continuously committed in small batches of records into an existing Hive partition or table. Once data is committed it becomes immediately visible to all Hive queries initiated subsequently.

...

  1. The following settings are required in hive-site.xml to enable ACID support for streaming:
    1. hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
    2. hive.compactor.initiator.on = true (See more important details here)
    3. hive.compactor.cleaner.on = true (From Hive 4.0.0 onwards. See more important details here)
    4. hive.compactor.worker.threads > 0 
  2. “stored as orc” must be specified during table creation. Only ORC storage format is supported currently.
  3. tblproperties("transactional"="true") must be set on the table during creation.
  4. The Hive table must be bucketed, but not sorted. So something like “clustered by (colName) into 10 buckets” must be specified during table creation. The number of buckets is ideally the same as the number of streaming writers.
  5. User of the client streaming process must have the necessary permissions to write to the table or partition and create partitions in the table.
  6. (Temporary requirements) When issuing queries on streaming tables, the client needs to set
    1. hive.vectorized.execution.enabled  to  false     (for Hive version < 0.14.0)

    2. hive.input.format  to  org.apache.hadoop.hive.ql.io.HiveInputFormat

...

Class StrictJsonWriter  implements the RecordWriter interface. It accepts input records that in strict JSON format and writes them to Hive. It converts the JSON record directly into an Object using JsonSerde, which is then passed on to the underlying AcidOutputFormat's record updater for the appropriate bucket.  See Javadoc.

AbstractRecordWriter

StrictRegexWriter

Class StrictRegexWriter implements the RecordWriter interface. It accepts input records, regex that in text format and writes them to Hive. It converts the text record using proper regex directly into an Object using RegexSerDe, which is then passed on to the underlying AcidOutputFormat's record updater for the appropriate bucket. See Javadoc.  Available in Hive 1.2.2+ and 2.3.0+.

AbstractRecordWriter

This is a base class that contains some of the common code needed by RecordWriter This is a base class that contains some of the common code needed by RecordWriter objects such as schema lookup and computing the bucket into which a record should belong.

...

Code Block
languagetext
///// Stream five records in two transactions /////
 
// Assumed HIVE table Schema:
create table alerts ( id int , msg string )
     partitioned by (continent string, country string)
     clustered by (id) into 5 buckets
     stored as orc tblproperties("transactional"="true"); // currently ORC is required for streaming
 
 
//-------   MAIN THREAD  ------- //
String dbName = "testing";
String tblName = "alerts";
ArrayList<String> partitionVals = new ArrayList<String>(2);
partitionVals.add("Asia");
partitionVals.add("India");
String serdeClass = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
 
HiveEndPoint hiveEP = new HiveEndPoint("thrift://x.y.com:9083", dbName, tblName, partitionVals);


.. spin up threads ..


//-------   Thread 1  -------//
StreamingConnection connection = hiveEP.newConnection(true);
DelimitedInputWriter writer =
                     new DelimitedInputWriter(fieldNames,",", endPthiveEP);
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 1 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();


if(txnBatch.remainingTransactions() > 0) {
///// Batch 1 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("3,Roshan Naik".getBytes());
txnBatch.write("4,Alan Gates".getBytes());
txnBatch.write("5,Owen O’Malley".getBytes());
txnBatch.commit();


txnBatch.close();
connection.close();
}


txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 2 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("6,David Schorow".getBytes());
txnBatch.write("7,Sushant Sowmyan".getBytes());
txnBatch.commit();


if(txnBatch.remainingTransactions() > 0) {
///// Batch 2 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("8,Ashutosh Chauhan".getBytes());
txnBatch.write("9,Thejas Nair" getBytes());
txnBatch.commit();


txnBatch.close();
}


connection.close();


//-------   Thread 2  -------//


StreamingConnection connection2 = hiveEP.newConnection(true);
DelimitedInputWriter writer2 =
                     new DelimitedInputWriter(fieldNames,",", endPthiveEP);
TransactionBatch txnBatch2= connection.fetchTransactionBatch(10, writer2);


///// Batch 1 - First TXN
txnBatch2.beginNextTransaction();
txnBatch2.write("21,Venkat Ranganathan".getBytes());
txnBatch2.write("22,Bowen Zhang".getBytes());
txnBatch2.commit();


///// Batch 1 - Second TXN
txnBatch2.beginNextTransaction();
txnBatch2.write("23,Venkatesh Seetaram".getBytes());
txnBatch2.write("24,Deepesh Khandelwal".getBytes());
txnBatch2.commit();


txnBatch2.close();
connection.close();



txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 2 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("26,David Schorow".getBytes());
txnBatch.write("27,Sushant Sowmyan".getBytes());
txnBatch.commit();


txnBatch2.close();
connection2.close();

...

Important:  To connect using Kerberos, the 'authenticatedUser' argument to EndPoint.newConnection() should have been used to do a Kerberos login.  Additionally the 'hive.metastore.kerberos.principal' setting should be set correctly either in hive-site.xml or in the 'conf' argument (if not null). If using hive-site.xml, its directory should be included in the classpath. 


Code Block
languagetext
import org.apache.hadoop.security.UserGroupInformation;

HiveEndPoint hiveEP2 = ... ;
UserGroupInformation ugi = .. authenticateWithKerberos(principal,keytab);
StreamingConnection secureConn = hiveEP2.newConnection(true, null, ugi);

DelimitedInputWriter writer3 = new DelimitedInputWriter(fieldNames, ",", hiveEP2);

TransactionBatch txnBatch3= secureConn.fetchTransactionBatch(10, writer3);

///// Batch 1 - First TXN – over secure connection
txnBatch3.beginNextTransaction();
txnBatch3.write("28,Eric Baldeschwieler".getBytes());
txnBatch3.write("29,Ari Zilka".getBytes());
txnBatch3.commit();

txnBatch3.close();
secureConn.close();

Knowledge Base