Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

HDFS does not support in-place changes to files.  It also does not offer read consistency in the face of writers appending to files being read by a user.  In order to provide these features on top of HDFS we have followed the standard approach used in other data warehousing tools.  Data for the table or partition is stored in a set of base files.  New records, updates, and deletes are stored in delta files.  A new set of delta files is created for each transaction (or in the case of streaming agents such as Flume or Storm, each batch of transactions) that alters a table or partition.  At read time the reader merges the base and delta files, applying any updates and deletes as it reads.

Base and Delta

...

Occasionally these changes need to be merged into the base files.  To do this a set of threads has been added to the Hive metastore.  They determine when this compaction needs to be done, execute the compaction, and then clean up afterwards.  There are two types of compactions, minor and major.

  • Minor compaction takes a set of existing delta files and rewrites them to a single delta file per bucket.
  • Major compaction takes one or more delta files and the base file for the bucket and rewrites them into a new base file per bucket.

All compactions are done in the background and do not prevent concurrent reads and writes of the data.  After a compaction the system waits until all readers of the old files have finished and then removes the old files.

Compactions are MR jobs with name in the following form: <hostname>-compactor-<db>.<table>.<partition>

Base and Delta Directories

...

Directories

Previously all files for a partition (or a table if the table is not partitioned) lived in a single directory.  With these changes, any partitions (or tables) written with an ACID aware writer will have a directory for the base files and a directory for each set of delta files.  Here is what this may look like for an unpartitioned table "t":

Code Block
titleFilesystem Layout for Table "t"
hive> dfs -ls -R /user/hive/warehouse/t;
drwxr-xr-x   - ekoifman staff          0 2016-06-09 17:03 /user/hive/warehouse/t/base_0000022
-rw-r--r--   1 ekoifman staff        602 2016-06-09 17:03 /user/hive/warehouse/t/base_0000022/bucket_00000
drwxr-xr-x   - ekoifman staff          0 2016-06-09 17:06 /user/hive/warehouse/t/delta_0000023_0000023_0000
-rw-r--r--   1 ekoifman staff        611 2016-06-09 17:06 /user/hive/warehouse/t/delta_0000023_0000023_0000/bucket_00000
drwxr-xr-x   - ekoifman staff          0 2016-06-09 17:07 /user/hive/warehouse/t/delta_0000024_0000024_0000
-rw-r--r--   1 ekoifman staff        610 2016-06-09 17:07 /user/hive/warehouse/t/delta_0000024_0000024_0000/bucket_00000

Compactor

Compactor is a set of background processes running inside the Metastore to support ACID system.  It consists of Initiator, Worker, Cleaner, AcidHouseKeeperService and a few others.

Delta File Compaction

As operations modify the table more and more delta files are created and need to be compacted to maintain adequate performance.  There are two types of compactions, minor and major.

  • Minor compaction takes a set of existing delta files and rewrites them to a single delta file per bucket.
  • Major compaction takes one or more delta files and the base file for the bucket and rewrites them into a new base file per bucket.  Major compaction is more expensive but is more effective.

All compactions are done in the background and do not prevent concurrent reads and writes of the data.  After a compaction the system waits until all readers of the old files have finished and then removes the old files.

Initiator

This module is responsible for discovering which tables or partitions are due for compaction.  This should be enabled in a Metastore using hive.compactor.initiator.on.  There are several properties of the form *.threshold in "New Configuration Parameters for Transactions" table below that control when a compaction task is created and which type of compaction is performed.  Each compaction task handles 1 partition (or whole table if the table is unpartitioned).

Worker

Each Worker handles a single compaction task.  A compaction is a MapReduce job with name in the following form: <hostname>-compactor-<db>.<table>.<partition>.  Each worker submits the job to the cluster (via hive.compactor.job.queue if defined) and waits for the job to finish.  hive.compactor.worker.threads determines the number of Workers in each Metastore.  The total number of Workrs in the Hive Warehouse determines the maximum number of concurrent compactions.

Cleaner

This process is a process that deletes delta files after compaction and after it determines that they are no longer needed.

AcidHouseKeeperService

This process looks for transactions that have not heartbeated in hive.txn.timeout time and aborts them.  The system assumes that a client that initiated a transaction stopped heartbeating crashed and the resources it locked should be released.

Lock Manager

A new lock manager has also been added to Hive, the DbLockManager.  This lock manager stores all lock information in the metastore.  In addition all transactions are stored in the metastore.  This means that transactions and locks are durable in the face of server failure.  To avoid clients dying and leaving transaction or locks dangling, a heartbeat is sent from lock holders and transaction initiators to the metastore on a regular basis.  If a heartbeat is not received in the configured amount of time, the lock or transaction will be aborted.

...

More details on locks used by this Lock Manager.

Configuration

These Minimally, these configuration parameters must be set appropriately to turn on transaction support in Hive:

Client Side

Server Side (Metastore)

...

Configuration key

Values

Location

Notes

hive.txn.manager 

Default: org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager

Value required for transactions: org.apache.hadoop.hive.ql.lockmgr.DbTxnManager

Client/
HiveServer2

DummyTxnManager replicates pre Hive-0.13 behavior and provides no transactions.

hive.txn.timeout 

Default: 300

Client/
HiveServer2/

Metastore 

Time after which transactions are declared aborted if the client has not sent a heartbeat, in seconds. It's critical that this property has the same value for all components/services.5

hive.txn.heartbeat.threadpool.sizeDefault: 5

Client/

HiveServer2

The number of threads to use for heartbeating (as of Hive 1.3.0 and 2.0.0).
hive.timedout.txn.reaper.startDefault: 100sMetastoreTime delay of first reaper (the process which aborts timed-out transactions) run after the metastore starts (as of Hive 1.3.0). Controls AcidHouseKeeperServcie above.

hive.timedout.txn.reaper.interval

Default: 180sMetastore

Time interval describing how often the reaper (the process which aborts timed-out transactions) runs (as of Hive 1.3.0). Controls AcidHouseKeeperServcie above.

hive.txn.max.open.batch

Default: 1000

Client

Maximum number of transactions that can be fetched in one call to open_txns().1

hive.max.open.txnsDefault: 100000

HiveServer2/

 Metastore

Maximum number of open transactions. If current open transactions reach this limit, future open transaction requests will be rejected, until the number goes below the limit. (As of Hive 1.3.0 and 2.1.0.)
hive.count.open.txns.intervalDefault: 1s

HiveServer2/

 Metastore

Time in seconds between checks to count open transactions (as of Hive 1.3.0 and 2.1.0).
hive.txn.retryable.sqlex.regexDefault: "" (empty string)

HiveServer2/

 Metastore

Comma separated list of regular expression patterns for SQL state, error code, and error message of retryable SQLExceptions, that's suitable for the Hive metastore database (as of Hive 1.3.0 and 2.1.0).

For an example, see Configuration Properties.

hive.compactor.initiator.on

Default: false

Value required for transactions: true (for exactly one instance of the Thrift metastore service)

Metastore

Whether to run the initiator and cleaner threads on this metastore instance. Prior to Hive 1.3.0 it's critical that this is enabled on exactly one standalone metastore service instance (not enforced yet).

As of Hive 1.3.0 this property may be enabled on any number of standalone metastore instances.

 

hive.compactor.worker.threads

Default: 0

Value required for transactions: > 0 on at least one instance of the Thrift metastore service

Metastore

How many compactor worker threads to run on this metastore instance.2

hive.compactor.worker.timeout

Default: 86400

Metastore

Time in seconds after which a compaction job will be declared failed and the compaction re-queued.

hive.compactor.cleaner.run.intervalDefault: 5000MetastoreTime in milliseconds between runs of the cleaner thread. (Hive 0.14.0 and later.)

hive.compactor.check.interval

Default: 300

Metastore

Time in seconds between checks to see if any tables or partitions need to be compacted.3

hive.compactor.delta.num.threshold

Default: 10

Metastore

Number of delta directories in a table or partition that will trigger a minor compaction.

hive.compactor.delta.pct.threshold

Default: 0.1

Metastore

Percentage (fractional) size of the delta files relative to the base that will trigger a major compaction. 1 = 100%, so the default 0.1 = 10%.

hive.compactor.abortedtxn.threshold

Default: 1000

Metastore

Number of aborted transactions involving a given table or partition that will trigger a major compaction.

hive.compactor.max.num.delta

Default: 500MetastoreMaximum number of delta files that the compactor will attempt to handle in a single job (as of Hive 1.3.0).4

hive.compactor.job.queue

Default: "" (empty string) Metastore Used to specify name of Hadoop queue to which Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue (as of Hive 1.3.0).

...