Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: making note of tblproperties("transactional"="true") requirement

...

  1. The following settings are required in hive-site.xml to enable ACID support for streaming:
    1. hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
    2. hive.compactor.initiator.on = true
    3. hive.compactor.worker.threads > 0 
  2. “stored as orc” must be specified during table creation. Only ORC storage format is supported currently.
  3. tblproperties("transactional"="true") must be set on the table during creation.
  4. The Hive table must be bucketed, but not sorted. So something like “clustered by (colName) into 10 buckets” must be specified during table creation. The number of buckets is ideally the same as the number of streaming writers.
  5. User of the client streaming process must have the necessary permissions to write to the table or partition and create partitions in the table.
  6. (Temporary requirements) When issuing queries on streaming tables, the client needs to set
    1. hive.vectorized.execution.enabled  to  false     (for Hive version < 0.14.0)

    2. hive.input.format  to  org.apache.hadoop.hive.ql.io.HiveInputFormat

...

Code Block
languagetext
///// Stream five records in two transactions /////
 
// Assumed HIVE table Schema:
create table alerts ( id int , msg string )
     partitioned by (continent string, country string)
     clustered by (id) into 5 buckets
     stored as orc tblproperties("transactional"="true"); // currently ORC is required for streaming
 
 
//-------   MAIN THREAD  ------- //
String dbName = "testing";
String tblName = "alerts";
ArrayList<String> partitionVals = new ArrayList<String>(2);
partitionVals.add("Asia");
partitionVals.add("India");
String serdeClass = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";
 
HiveEndPoint hiveEP = new HiveEndPoint("thrift://x.y.com:9083", dbName, tblName, partitionVals);


.. spin up threads ..


//-------   Thread 1  -------//
StreamingConnection connection = hiveEP.newConnection(true);
DelimitedInputWriter writer =
                     new DelimitedInputWriter(fieldNames,",", endPt);
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 1 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();


if(txnBatch.remainingTransactions() > 0) {
///// Batch 1 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("3,Roshan Naik".getBytes());
txnBatch.write("4,Alan Gates".getBytes());
txnBatch.write("5,Owen O’Malley".getBytes());
txnBatch.commit();


txnBatch.close();
connection.close();
}


txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 2 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("6,David Schorow".getBytes());
txnBatch.write("7,Sushant Sowmyan".getBytes());
txnBatch.commit();


if(txnBatch.remainingTransactions() > 0) {
///// Batch 2 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("8,Ashutosh Chauhan".getBytes());
txnBatch.write("9,Thejas Nair" getBytes());
txnBatch.commit();


txnBatch.close();
}


connection.close();


//-------   Thread 2  -------//


StreamingConnection connection2 = hiveEP.newConnection(true);
DelimitedInputWriter writer2 =
                     new DelimitedInputWriter(fieldNames,",", endPt);
TransactionBatch txnBatch2= connection.fetchTransactionBatch(10, writer2);


///// Batch 1 - First TXN
txnBatch2.beginNextTransaction();
txnBatch2.write("21,Venkat Ranganathan".getBytes());
txnBatch2.write("22,Bowen Zhang".getBytes());
txnBatch2.commit();


///// Batch 1 - Second TXN
txnBatch2.beginNextTransaction();
txnBatch2.write("23,Venkatesh Seetaram".getBytes());
txnBatch2.write("24,Deepesh Khandelwal".getBytes());
txnBatch2.commit();


txnBatch2.close();
connection.close();



txnBatch = connection.fetchTransactionBatch(10, writer);


///// Batch 2 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("26,David Schorow".getBytes());
txnBatch.write("27,Sushant Sowmyan".getBytes());
txnBatch.commit();


txnBatch2.close();
connection2.close();

...