Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Traditionally adding new data into Hive requires gathering a large amount of data onto HDFS and then periodically adding a new partition. This is essentially a “batch insertion”. Insertion of new data into an existing partition is not permitted. 

Hive Streaming API allows data to be pumped continuously into Hive. The incoming data can be continuously committed in small batches of records into an existing Hive partition or table. Once data is committed it becomes immediately visible to all Hive queries initiated subsequently.

...

  1. The following settings are required in hive-site.xml to enable ACID support for streaming:
    1. hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
    2. hive.compactor.initiator.on = true (See more important details here)
    3. hive.compactor.cleaner.on = true (From Hive 4.0.0 onwards. See more important details here)
    4. hive.compactor.worker.threads > 0 
  2. “stored as orc” must be specified during table creation. Only ORC storage format is supported currently.
  3. tblproperties("transactional"="true") must be set on the table during creation.
  4. User of the client streaming process must have the necessary permissions to write to the table or partition and create partitions in the table.

  5. (Temporary requirements) When issuing queries on streaming tables, the client needs to set
    1. hive.vectorized.execution.enabled  to  false (for Hive version < 0.14.0)

    2. hive.input.format  to  org.apache.hadoop.hive.ql.io.HiveInputFormat

Limitations

Limitations

Out of the box, currently, the streaming API only provides Out of the box, currently, the streaming API only provides support for streaming delimited input data (such as CSV, tab separated, etc.), JSON and Regex formatted data. The records writers that are supported in Hive 3.0.0 release are

...

All these record writers expects strict schema match, meaning the schema of the records should exactly match with the table schema (Note: The writers does not perform schema check, it is upto up to the clients to make sure the record schema matches the table schema). 

...

Transactions are implemented slightly differently than traditional database systems. Each transaction has an id and multiple transactions are grouped into a “Transaction Batch”. This helps grouping records from multiple transactions into fewer files (rather than 1 file per transaction). During hive streaming connection creation, transaction batch size can be specified via builder API. Transaction management is completely hidden behind the API, in most cases users do not have to worry about tuning the transaction batch size (which is an expert level setting and might not be honored in future release). Also the API automatically rolls over to next transaction batch on beginTransaction() invocation if the current transaction batch is exhausted. The recommendation is to leave the transaction batch size at default value of 1 and group several thousands records together under a each transaction. Since each transaction corresponds to a delta directory in the filesystem, committing transaction too often can end up creating too many small directories. 

...

Generally, the more records are included in each transaction the more throughput can be achieved.  It's common to commit either after a certain number of records or after a certain time interval, whichever comes first.  The later ensures that when event flow rate is variable, transactions don't stay open too long.  There is no practical limit on how much data can be included in a single transaction.  The The only concern is amount of data which will need to be replayed if the transaction fails.  The The concept of a TransactionBatch serves to reduce the number of files (and delta directories) created by HiveStreamingConnection API in the filesystem. Since all transactions in a given transaction batch write to the same physical file (per bucket), a partition can only be compacted up to the the level of the earliest transaction of any batch which contains an open transaction.  Thus TransactionBatches should not be made excessively large.  It makes sense to include a timer to close a TransactionBatch (even if it has unused transactions) after some amount of timeof time.

The HiveStreamingConnection is highly optimized for write throughput (Delta Streaming Optimizations) and as a result the delta files generated by Hive streaming ingest have many of the ORC features disabled (dictionary encoding, indexes, compression, etc.) to facilitate high throughput writes. When the compactor kicks in, these delta files get rewritten into read- and storage-optimized ORC format (enable dictionary encoding, indexes and compression). So it is recommended to configure the compactor more aggressively/frequently (refer to Compactor) to generate compacted and optimized ORC files.

Notes about the HiveConf Object

...

  • hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
  • hive.support.concurrency = true
  • hive.metastore.execute.setugi = true
  • hive.execution.engine = mrhive.exec.dynamic.partition.mode = nonstrict
  • hive.exec.orc.delta.streaming.optimizations.enabled = true
  • hive.metastore.client.cache.enabled = false

I/O – Writing Data

These classes and interfaces provide support for writing the data to Hive within a transaction.

...

  1. Modify input record: This may involve dropping fields from input data if they don’t have corresponding table columns, adding nulls in case of missing fields for certain columns, and adding __HIVE_DEFAULT_PARTITION__ if partition column value is null or empty. Dynamically creating partitions requires understanding of incoming data format to extract last columns to extract partition values.
  2. Encode modified record: The encoding involves serialization using an appropriate Hive SerDe.
  3. Identify For bucketed tables, extract bucket column values from the record to identify the bucket to which where the record belongsIdentify the partition to which .
  4. For partitioned tables, in dynamic partitioning mode, extract the partition column values from last N columns (where N is number of partitions) of the record to identify the partition where the record belongs.
  5. Write encoded record to Hive using the AcidOutputFormat's record updater for the appropriate bucket.

...