You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

Overview

We already introduce metastore cache in Hive. However, the cache is not synchronized. That is, if metastore 1 changed a table/partition, metastore 2 won’t see the change immediately. There’s a background thread keep polling from the notification log and update changed entries, so the cache is eventually consistent. In this work, we want to make the cache full consistent. The idea is at read time, we will check if the cached entry is obsolete or not. However, we don’t want to penalize the read performance. We don’t want to introduce additional db call to compare the db version and cached version in order to do the check. The solution is we will use the transaction state of a query for the version check. A query will pull the transaction state of involved tables (ValidWriteIdList) from db (non-cached) anyway. So we don’t need additional db call to check the staleness of the cache.

Data structure change

The only data structure change is adding ValidWriteIdList into SharedCache.TableWrapper/SharedCache.PartitionWrapper, which represents the transaction state of the cached entry.

Note there is no db table structure change, and we don’t store extra information in db. We don’t update TBLS.WRITE_ID field as we will use db as the fact of truth. We assume db always carry the latest copy and every time we fetch from db, we will tag it with the transaction state of the query.

API changes

Adding ValidWriteIdList into metastore thrift API and RawStore interfaces, including all table/partition related read/write calls.

Hive_metastore.thrift

Adding a serialized version of ValidWriteIdList to every relevant metastore API. The following table is just for illustration. The reality is a bit more complex because of two reasons:

  1. Many APIs are using a request structure rather than taking individual parameters. So need to add ValidWriteIdList to the request structure instead
  2. Some APIs already take ValidWriteIdList to invalidate outdated transactional statistics. We don’t need to change the API signature, but will reuse the ValidWriteIdList to validate cached entries in CachedStore


For HMS write, if validWriteIdList=null, HMS won’t cache the entry at all if this is managed table, and will cache regardless of validWriteIdList if this is external table. For HMS read, if validWriteIdList=null, HMS will return null if it is managed table, and return the cached entry regardless if it is external table(with eventual consistency model).

Thrift API will remain backward compatible for external table. That is, new server can deal with old client. If the old client issue a create_table call, server side will receive the request of create_table with validWriteIdList=null, and will cache or retrieve the entry regardless. For managed table, validWriteIdList will be required and HMS server will throw an exception if validWriteIdList=null (for both read and write).

hive_metastore.thriftOld API

New API

create_table(Table tbl)

create_table(Table tbl,string validWriteIdList)

get_table(string dbname,string tbl_name)

get_table(string dbname,string tbl_name,string validWriteIdList)

RawStore

ObjectStore will not use the additional field, CachedStore will use it to put in TableWrapper/PartitionWrapper (write), or compare with cached ValidWriteIdList (read).


Old API

New API

createTable(Table tbl)

createTable(Table tbl,String validWriteIdList)

getTable(String catName,String dbName,String tableName)

getTable(String catName,String dbName,String tableName,String validWriteIdList)

Hive.java

The implementation details will be encapsulated in Hive.java. Which include:

  1. Generate new write id for every write operation involving managed tables. Since DbTxnManager cache write id for every transaction, so every query will generate at most one new write id for a single table, even if it consists of multiple Hive.java write API calls
  2. Retrieve table write id from config for every read operation if exists (for managed table, it guarantees to be there in config), and pass the write id to HMS API

Other

All other components invoking HMS API directly (bypass Hive.java) will be changed to invoke the newer HMS API. This includes HCatalog, Hive streaming, etc, and other projects using HMS client such as Impala.

For every read/write request involving table/partitions, HMS client need to pass a validWriteIdList string in addition to the existing arguments. validWriteIdList is a serialized form of ValidReaderWriteIdList (https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java#L119). Usually ValidReaderWriteIdList can be obtained from HiveTxnManager(https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java) using the following code snippet:

ValidTxnList txnIds = txnMgr.getValidTxns(); // get global transaction state

txnWriteIds = txnMgr.getValidWriteIds(txnTables, txnString); // map global transaction state to table specific write id

Use cases

Write

Hive needs to pass a ValidWriteIdList for every metastore write operation (table/partition). CachedStore will store ValidWriteIdList along with the entry in cache. Every Hive query (either DDL or DML) will retrieve a ValidWriteIdList at the beginning of the query. Let’s look at some examples.

  1. User creates a table with a “create table” statement:
    1. Before invoking metastore create_table API, Hive will allocate a write id 1 for the table
    2. Hive will invoke HMS with create_table, passing ValidWriteIdList [1:] (HWM=1, no open transactions)
    3. Metastore will write to db, and cache the table along with ValidWriteIdList [1:] into CachedStore
    4. Metastore will put the new table and ValidWriteIdList into notification log
    5. Other metastore will read notification log and update the cache
  2. User alter a table with “alter table” statement:
    1. At the beginning of the query, Hive will retrieve the global transaction state and store in config (ValidTxnList.VALID_TXNS_KEY)
    2. Hive translate ValidTxnList to ValidWriteIdList of the table [10:7,8]
    3. Hive allocate a new write id 11 for alter table, new ValidWriteIdList becomes [11:7,8]
    4. Metastore will write to db, and cache new table along with ValidWriteIdList [11:7,8] into CachedStore
    5. Metastore will put the new table and ValidWriteIdList into notification log
    6. Other metastore will read notification log and update the cache
  3. User update a table with “update” statement:
    1. At the beginning of the query, Hive will retrieve the global transaction state and store in config (ValidTxnList.VALID_TXNS_KEY)
    2. Hive translate ValidTxnList to ValidWriteIdList of the table [11:7,8]
    3. After update, Hive will invoke alter_table to update the writeid. Before that, Hive allocates a new write id 11 for alter table, new ValidWriteIdList becomes [12:7,8]
    4. Metastore will write to db, and cache new table along with ValidWriteIdList [12:7,8] into CachedStore
    5. Metastore will put the new table and ValidWriteIdList into notification log
    6. Other metastore will read notification log and update the cache

Read

Metastore read request will compare ValidWriteIdList parameter with the cached one. If there’s no transaction committed, Metastore will use cached copy. Otherwise, Metastore will retrieve the entry from ObjectStore, cache it to CachedStore along with the new ValidWriteIdList.

Here is the example for a get_table request:

  1. At the beginning of the query, Hive will retrieve the global transaction state and store in config (ValidTxnList.VALID_TXNS_KEY)
  2. Hive translate ValidTxnList to ValidWriteIdList of the table [13:7,8,12]
  3. Metastore compare ValidWriteIdList [13:7,8,12] with the cached one [12:7,8] using TxnIdUtils.checkEquivalentWriteIds, if no transaction committed between two states, Metastore return cached table entry
  4. If the cached ValidTxnList is [12:7], the comparison fails because write id 8 is committed. Metastore will fetch the table from ObjectStore

Here is another example of get_partitions_by_expr. In contrast with previous example, this API is a list query not a point lookup:

  1. At the beginning of the query, Hive will retrieve the global transaction state and store in config (ValidTxnList.VALID_TXNS_KEY)
  2. Hive translate ValidTxnList to ValidWriteIdList of the table [13:7,8,12]
  3. Metastore go through the cached partitions of the table, if any of the partition in cache does not compatible with the new ValidWriteIdList, Metastore will serve the list query from ObjectStore

Bootstrap

The use cases discussed so far are driven by a query. However, during the HMS startup, there’s a cache prewarm. HMS will fetch everything from db to cache. There is no particular query drives the process, that means we don’t have ValidWriteIdList of the query. Prewarm needs to generate ValidWriteIdList by itself. To do that, for every table, HMS will query the current global transaction state ValidTxnList (HiveTxnManager.getValidTxns), and then convert it to table specific ValidWriteIdList (HiveTxnManager.getValidWriteIds). As an optimization, we don’t have to invoke HiveTxnManager.getValidTxns per table. We can invoke it every couple of minutes. If ValidTxnList is outdated, we will get an outdated ValidWriteIdList. Next time when Hive read this entry, Metastore will fetch from the db even though it is in fact fresh. There’s no correctness issue, only impact performance in some cases. The other possibility is the entry changes after we fetches ValidWriteIdList. This is not unlikely as fetching all partitions of the table may take some time. If that happens, the cached entry is actually newer than the ValidWriteIdList. The next time Hive reads it will trigger a db read though it is not necessary. Again, there’s no correctness issue, only impact performance in some cases.

Cache update

In the previous discussion, we know if the cache is stale, metastore will serve the request from ObjectStore. However, we still need to catch up the cache with the latest change, in order to serve read requests from cache for future request. This can be done by the existing notification log based cache update mechanism. This mechanism constantly poll from notification log, and update the cache with the data entries in notification log. However, currently there is no ValidWriteIdList in notification log. We need to add ValidWriteIdList of the query to the notification log. During the update, this ValidWriteIdList will be in the cache. However, there is another scenario need to consider. If a DML consists of multiple HMS writes, all HMS writes will share the same ValidWriteIdList. Another HMS may catch up only 1 writes by the time the DML is committed. This is not acceptable since HMS cannot return partial changes to the user after the DML is committed, that defeats the purpose of a synchronized cache. To solve this problem, we will maintain a committed flag for every entry. During notification log catch up, we will not set the committed flag for a metastore write. Instead, we will wait for the commit event in the notification log. We will set the committed flag in cache until we receive the commit event from notification log. Also note currently commit event only contains txnid, we need to add ValidWriteIdList to the event. During HMS read, If we find the entry is marked uncommitted, HMS will fetch from db.

Here is a complete flow for a cache update when write happen (and illustrated in the diagram):

  1. The ValidWriteIdList of cached table is initially [13:7,8,12]
  2. Metastore 1 get a alter_table request, the new ValidWriteIdList is [14:7,8,12]. The cache entry in Metastore 1 is updated with the new ValidWriteIdList [14:7,8,12]. Metastore 1 also puts the new table along with ValidWriteIdList [14:7,8,12] to notification log. When the transaction is committed, HMS put the commit event into notification log along with ValidWriteIdList
  3. The ValidWriteIdList entry in Metastore 2 is still [13:7,8,12]. A new query try to fetch the table with the latest write id [14:7,8,12]. It is not compatible with the cached version, so Metastore will fetch the table from ObjectSore
  4. The cache update thread in Metastore 2 will read the alter_table event from notification log, update the cache with [14:7,8,12] which is in the notification log, however, this entry is marked as uncommitted
  5. A read for the entry on Metastore 2 will fetch from db since the entry is marked as uncommitted
  6. The cache update thread will further read commit event from notification log, mark the cache committed
  7. The next read from Metastore 2 will server from cache


External Table

Write id is not valid for external tables. And Hive won’t fetch ValidWriteIdList if the query source is external tables. Without ValidWriteIdList, HMS won’t able to check the staleness of the cache. To solve this problem, we will use the original eventually consistent model for external tables. That is, if the table is external table, Hive will pass null ValidWriteIdList to metastore API/CachedStore. Metastore cache won’t store ValidWriteIdList alongside the entry. When reading, CachedStore always retrieve the current copy. The original notification update will update the metadata of external tables, so we can eventually get updates from external changes.

  • No labels