Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. The following settings are required in hive-site.xml to enable ACID support for streaming:
    1. hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
    2. hive.compactor.initiator.on = true (See more important details here)
    3. hive.compactor.cleaner.on = true (From Hive 4.0.0 onwards. See more important details here)
    4. hive.compactor.worker.threads > 0 
  2. “stored as orc” must be specified during table creation. Only ORC storage format is supported currently.
  3. tblproperties("transactional"="true") must be set on the table during creation.
  4. User of the client streaming process must have the necessary permissions to write to the table or partition and create partitions in the table.

...

The HiveStreamingConnection is highly optimized for write throughput (Delta Streaming Optimizations) and as a result the delta files generated by hive Hive streaming ingest has have many of the orc ORC features disabled (dictionary encoding, indexes, compression, etc.) to facilitate high throughput writes. When the compactor kicks in, these delta files gets get rewritten into read- and storage-optimized orc ORC format (enable dictionary encoding, indexes and compression). So it is recommended to configure the compactor more aggressively/frequently (refer to Compactor) to generate compacted and optimized orc ORC files.

Notes about the HiveConf Object

...

  • hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
  • hive.support.concurrency = true
  • hive.metastore.execute.setugi = true
  • hive.exec.dynamic.partition.mode = nonstrict
  • hive.exec.orc.delta.streaming.optimizations.enabled = true
  • hive.metastore.client.cache.enabled = false

I/O – Writing Data

These classes and interfaces provide support for writing the data to Hive within a transaction.

...

  1. Modify input record: This may involve dropping fields from input data if they don’t have corresponding table columns, adding nulls in case of missing fields for certain columns, and adding __HIVE_DEFAULT_PARTITION__ if partition column value is null or empty. Dynamically creating partitions requires understanding of incoming data format to extract last columns to extract partition values.
  2. Encode modified record: The encoding involves serialization using an appropriate Hive SerDe.
  3. For bucketed tables, extract bucket column values from the record to identify the bucket where the record belongs.
  4. For partitioned tables, in dynamic partitioning mode, extract the partition column values from last N columns (where N is number of partitions) of the record to identify the partition where the record belongs.
  5. Write encoded record to Hive using the AcidOutputFormat's record updater for the appropriate bucket.

...