Note: This document is work in progress.

Contributors (alphabetical): Vandana Ayyalasomayajula, Francis Liu, Andreas Neumann, Thomas Weise

Objective

The objective of providing revision management capability to HBase tables is to preserve the functional programming paradigm for the grid. The map/reduce paradigm had proven efficient for the big data and it will be really useful if map-reduce programs could have data from HBase tables as their input. Presently, that is not possible as the data present in HBase is not suitable for repeatable reads. This is due to the fact that HBase assigns its own timestamps to store various revisions of user data. So, it is a good idea to have a centralized component ( revision manager) which could replace the timestamps with more meaningful IDs that end-users could tag on for achieving repeatable reads.

Introduction

When a grid jobs reads a file from HDFS, it can be sure that this file will not change. This ensures repeatability of computation: If a task (say, a mapper) fails, and the job tracker retries that task, it will see the exact same state of that file as the previous attempt. Similarly, if a workflow depend on the consistency of reads across two jobs, the immutability of HDFS provides that. With the introduction of mutable tables, this invariant no longer holds, but map/reduce jobs or workflows still require repeatability of the reads. Therefore we will implement immutable snapshots of a mutable table.

Design

The natural way of implementing a snapshot is to fix a point in time (when the snapshot is taken), and read only what has been written before that time. HBase associates a time stamp with every value that is written to a table, and allows readers to use a filter that is bounded by time stamp limit. This gives us exactly what we need, if we ensure that all operations on a table are ordered by time. That means, after a snapshot was taken, no writes can be made with a time stamp before that snapshot.

The primary way of writing to HBase is in batch (and we can consider single writes as very small batches). We will assign a time stamp to each batch of writes, and use that time stamp for every write that part of this batch (note that this means there is not logical time ordering within a batch). The table manager will assign time stamp to every batch, and maintain for each table the largest time stamp that has been assigned. It also maintains the highest time stamp of batches that have finished (been "committed"), and when a snapshot is taken, it can return the largest time stamp such that all batches with an earlier or equal time stamp have finished. That means, a reader will only see batches that have completed, and it can repeatedly read that snapshot, because all batches with an earlier time stamp have finished - thus all writes that happen after the snapshot was taken have a larger time stamp.

For example, in the following situation, jobs A and B write a new value for x. Job C takes a snapshot after A has finished, but before B has finished. Therefore, C will see the value written by A, but it will not see the value written by B, even if the read happens after B has finished:

Thus, a job can take a snapshot of the table, and for that snapshot, all reads are guaranteed to be repeatable. Note that in order to guarantee repeatability of reads, the snapshot con only contains transactions for which it is known that all earlier transactions (with an earlier time stamp) has completed. Otherwise an earlier transaction could overwrite a value that has already been read, as illustrated in the following example. Suppose job A starts a transaction before job B starts its transactions (thus A has an earlier time stamp), but A writes a value for x after B has written a value for x and committed its transaction. If job C takes a snapshot between the two writes, it will first see B's value, but when it repeatedly reads x, it will see the updated value from A:

Therefore, a snapshot cannot contain transactions that are committed, but started after any uncommitted transaction. However, this means that a very long-running job can delay the visibility the writes of a short-running job significantly, as illustrated in this example: long-running job A starts a transaction before job B, which commits its transaction after very short time. Even though the two jobs write different columns and different jobs, B's writes are only available for snapshots after job A has finished its transaction.

How can we fix this? The idea is that different column families are often written by different applications. For instance, if a table contains news articles, then the articles themselves would be written to one column family by the news feed ingestion process, whereas a grid job that computes duplicate clusters and assigns each article to one cluster would write to a different column family, independently of each other. Suppose the clustering algorithm is very expensive and takes a long time to complete, but feeds ingestion is a short process that is repeated frequently. We don't want to block the (visibility of) ingestion of new articles with the long-running process.

If every transaction, upon opening, declares which column families it will write, then we can ensure repeatability at the column family level. A snapshot is then not a single time stamp, but a vector of time stamps, one for each column family. Now reconsider the previous example: Job C can see the writes from job B before A has finished its transaction, because its snapshot reflects that A and B write to different column families.

But what if the long-running job and the short-running job write to the same column family? In that case, a job that needs to see the latest version of every column and does not depend on repeatable reads can read without a snapshot. It will then see all writes that have been made to the table. Optionally, we could allow a snapshot to exclude column families for which repeatable reads are not important, such that a reader can see the latest writes for those column families while relying on repeatable reads for other families. However, this seems to be a very advanced use case, and it is questionable whether it should be implemented until a strong use case emerges.

Implementation

Definitions

  • Table Snapshot - A Snapshot (of a table) is defined as the state of the table at a particular point in time. The state being defined by the list of (column, revision) pairs. Snapshots could be created by the on the fly by requesting the revision manager to take a snapshot of a table.
  • Transaction - The term "transaction" is used to indicate a job which writes a batch of data with a particular revision number.

A zookeeper based revision management library has been implemented for creating snapshots for HBase tables. The revision manager performs the following functions:

  • It assigns a unique monotonically increasing revision number (scope: table) for every write transaction.
  • It maintains the currently running, aborted transactions of HBase tables.
  • It provides APIs for users to take a latest snapshot of a HBase Table or for a valid revision number.

The revision management library is integrated with the HBase storage driver for HCatalog. A table could be read by the user in three different modes. The users can specify an additional parameter which indicates the maximum number of revisions to retrieve. So if the user wants to retrieve more than one revision of data, then they can read data in any of the modes described below along with specifying the maximum number of revisions to read.

Consider an example of a table "hbase-table" has three columns C1, C2, C3. The picture below shows us the history of data written for a particular row (say with row key "foo").

        
 

  • The user specifies nothing specific to read. In this mode, the latest snapshot of the table will be taken by default. This snapshot would be used to read the table.
    Example:
    User specification : Read table "Cars".
    Output : (Z,Z,P).
  • The user specifies a snapshot. In this mode, the specified snapshot will be used to read the table.
    Example:
    mysnapshot = (C1,2), (C2,2),(C3,2)User specification : Read table "Cars".
    Output : (P,Z,R).

Now if a user wants to read more than one revision, he can specify a snapshot and the maximum number of revisions he wishes to read.

mysnapshot = (C1,5), (C2,5),(C3,5)
User specification : Read table "Cars" using mysnapshot, maximum number of revisions = 3.
Output : (Z,Z,P), (Q,R,Y), (P,Z,R).

Integration with Map-Reduce

Users can either explicitly specify a snapshot or one will be created automatically (latest snapshot of a table) as input for the job. The output data of a map-reduce program will be assigned a revision by the revision manager. If the program completes successfully, the revision information will be available with the "OutputJobInfo". To capture the state of the table after MR job completion, the users can ask the revision manager to take a snapshot. They could also use the revision information in the "OutputJobInfo", to create a snapshot and use it later.

Integration with Pig

Users can use the default behavior (latest snapshot created every time an MR job is launched). Features such as explicitly specifying snapshots is future work.

Revision Assignment

The revision manager maintains all the revision related data of a table in the zookeeper. The important data maintained in the zookeeper are:

  • Revision counter
  • The revisions of currently running transactions.
  • The revisions of aborted transactions.

During the initialization process, the HBase output storage driver obtains a revision from the revision manager. The revision manager, then adds this transaction to the list of currently running transactions. When the job completes successfully, then the transaction is removed from the "current running" list. If the job is aborted for some reason, transaction is added to the list of aborted transactions.

When a job (or user) requests the revision manager to take a latest snapshot, the list of currently running transactions is consulted. The lowest revision number minus 1 , among the list is used in the snapshot.

Revision Manager as Service

Design Goals

  • Keep revision manager independent of HCatalog (e.g. Pig or other component could use RM outside HCatalog to access HBase data)
  • HCatalog meta store server (through Hive meta hook) will eventually interact with revision manager for authorization checks and to synchronize related state (on drop table etc., currently this is handled client side)
  • Revision Manager as active component, it needs to manage transaction expiration (“active” does not imply any particular implementation choice, e.g. thread usage)
  • Security: Access control in revision manager will be delegated to HBase (use ACLs of corresponding tables and standard Hadoop Kerberos or delegation token authentication).

With ZooKeeper based revision manager

  • HBase source for authorization (logically revision data ACL should be same as table ACL)
  • ZooKeeper ACLs will restrict access to revision data to the RM service principal
  • RM service runs everything as service principal, not as authorized client

Transaction Expiration

  • Potentially many current/expired transactions
  • Integrate expiration inline/lazily as part of begin/abort/snapshot
  • We decided not to use a timer based thread option - at this time benefits don’t justify added complexity
  • beginTransaction performs actual expiration with revision data storage update (interval ~30s) - need to read modify and write open revisions at that point anyways
  • Skip expired transactions from active list in createSnapshot (cleanup only happens as side effect of subsequent beginTransaction calls)
  • Limitation: Will initially only handle revision data and don’t delete corresponding data from HBase (no “rollback”)

Service implementation

  • Implement service as HBase coprocessor endpoint
  • Provides access to HBase ACL and leverages HBase container for security and high availability
  • Authorization ACL easy within coprocessor. Has access to HBase ACL info and can use it to authorize client access w/o relying on ACLs in ZooKeeper (difficult to maintain).
  • Consider using HBase instead of ZK for revision meta data storage

UML diagram for ZooKeeper based implementation

The following picture shows the class diagram for the zookeeper based revision manager.

APIs

The APIs provided by the revision manager interface are:

    /**
     * Initialize the revision manager.
     */
    public void initialize(Properties properties);

    /**
     * Opens the revision manager.
     *
     * @throws IOException
     */
    public void open() throws IOException;

    /**
     * Closes the revision manager.
     *
     * @throws IOException
     */
    public void close() throws IOException;

    /**
     * Start the write transaction.
     *
     * @param table The name of table involved in the transaction.
     * @param families The column families of table in which data will be written by the transaction.
     * @return An instance of Transaction
     * @throws IOException
     */
    public Transaction beginWriteTransaction(String table, List<String> families)
            throws IOException;

    /**
     * Start the write transaction.
     *
     * @param table The name of table involved in the transaction.
     * @param families The column families of table in which data will be written by the transaction.
     * @param keepAlive The duration ( in milliseconds) after which the transaction will expire.
     * @return An instance of Transaction
     * @throws IOException
     */
    public Transaction beginWriteTransaction(String table,
            List<String> families, long keepAlive) throws IOException;

    /**
     * Commit the write transaction.
     *
     * @param transaction An instance of Transaction to be committed.
     * @throws IOException
     */
    public void commitWriteTransaction(Transaction transaction)
            throws IOException;

    /**
     * Abort the write transaction.
     *
     * @param transaction An instance of Transaction to be aborted.
     * @throws IOException
     */
    public void abortWriteTransaction(Transaction transaction)
            throws IOException;

    /**
     * Create the latest snapshot of the table.
     *
     * @param tableName The name of the table to create a snapshot.
     * @return An instance of TableSnapshot.
     * @throws IOException
     */
    public TableSnapshot createSnapshot(String tableName) throws IOException;

    /**
     * Create the snapshot of the table using the revision number.
     *
     * @param tableName The name of the table to create a snapshot.
     * @param revision The revision number to be used.
     * @return An instance of TableSnapshot.
     * @throws IOException
     */
    public TableSnapshot createSnapshot(String tableName, long revision)
            throws IOException;

    /**
     * Extends the expiration of a transaction by the time indicated by keep alive.
     *
     * @param transaction An instance of Transaction.
     * @throws IOException
     */
    public void keepAlive(Transaction transaction) throws IOException;


 

  • No labels