Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

Note: This document is work in progress.

Contributors (alphabetical): Vandana Ayyalasomayajula, Francis Liu, Andreas Neumann, Thomas Weise

Objective

The objective of providing revision management capability to HBase tables is to preserve the functional programming paradigm for the grid. The map/reduce paradigm had proven efficient for the big data and it will be really useful if map-reduce programs could have data from HBase tables as their input. Presently, that is not possible as the data present in HBase is not suitable for repeatable reads. This is due to the fact that HBase assigns its own timestamps to store various revisions of user data. So, it is a good idea to have a centralized component ( revision manager) which could replace the timestamps with more meaningful IDs that end-users could tag on for achieving repeatable reads.

...

But what if the long-running job and the short-running job write to the same column family? In that case, a job that needs to see the latest version of every column and does not depend on repeatable reads can read without a snapshot. It will then see all writes that have been made to the table. Optionally, we could allow a snapshot to exclude column families for which repeatable reads are not important, such that a reader can see the latest writes for those column families while relying on repeatable reads for other families. However, this seems to be a very advanced use case, and it is questionable whether it should be implemented until a strong use case emerges.

Implementation

Definitions

...

  • Table Snapshot - A Snapshot (of a table) is defined as the state of the table at a particular point in time. The state being defined by the list of (column, revision) pairs. Snapshots could be created by the on the fly by requesting the revision manager to take a snapshot of a table.

...

  • It assigns a unique monotonically increasing revision number (scope: table) for every write transaction.
  • It maintains the currently running, aborted transactions of HBase tables.
  • It provides APIs for users to take a latest snapshot of a HBase Table or for a valid revision number.

...

Consider an example of a table "Carshbase-table" has three columns C1, C2, C3. The picture below shows us the history of data written for a particular row (say with row key "foo").

         Image Added
 <insert image>

  • The user specifies nothing specific to read. In this mode, the latest snapshot of the table will be taken by default. This snapshot would be used to read the table.
    Example:
    User specification : Read table "Cars".
    Output : (Z,Z,P).

...

Now if a user wants to read more than one revision, he can specify a snapshot and the maximum number of revisions he wishes to read.

Example:
mysnapshot = (C1,5), (C2,5),(C3,5)
User specification : Read table "Cars" using mysnapshot, maximum number of revisions = 3.
Output : (Z,Z,P), (Q,R,Y), (P,Z,R).

Integration with Map-Reduce

The users Users can either explicitly specify the a mode to read a given HBase table in their map-reduce job specification. If no mode is specified to read the table, the HBase input storage driver would take a latest snapshot of the table and use it a snapshot or one will be created automatically (latest snapshot of a table) as input for the job. The output data of a map-reduce program will be assigned a revision by the revision manager. If the program completes successfully, the revision information will be available with the "OutputJobInfo". To capture the state of the table after MR job completion, the users can ask the revision manager to take a snapshot. They could also use the revision information in the "OutputJobInfo", to create a snapshot and use it later.

Integration with Pig

The integration of this feature with Pig Users can use the default behavior (latest snapshot created every time an MR job is launched). Features such as explicitly specifying snapshots is future work.

Revision Assignment

...

When a job (or user) requests the revision manager to take a latest snapshot, the list of currently running transactions is consulted. The lowest revision number minus 1 , among the list is used in the snapshot.

Revision Manager as Service

Design Goals

  • Keep revision manager independent of HCatalog (e.g. Pig or other component could use RM outside HCatalog to access HBase data)
  • HCatalog meta store server (through Hive meta hook) will eventually interact with revision manager for authorization checks and to synchronize related state (on drop table etc., currently this is handled client side)
  • Revision Manager as active component, it needs to manage transaction expiration (“active” does not imply any particular implementation choice, e.g. thread usage)
  • Security: Access control in revision manager will be delegated to HBase (use ACLs of corresponding tables and standard Hadoop Kerberos or delegation token authentication).

With ZooKeeper based revision manager

  • HBase source for authorization (logically revision data ACL should be same as table ACL)
  • ZooKeeper ACLs will restrict access to revision data to the RM service principal
  • RM service runs everything as service principal, not as authorized client

Transaction Expiration

  • Potentially many current/expired transactions
  • Integrate expiration inline/lazily as part of begin/abort/snapshot
  • We decided not to use a timer based thread option - at this time benefits don’t justify added complexity
  • beginTransaction performs actual expiration with revision data storage update (interval ~30s) - need to read modify and write open revisions at that point anyways
  • Skip expired transactions from active list in createSnapshot (cleanup only happens as side effect of subsequent beginTransaction calls)
  • Limitation: Will initially only handle revision data and don’t delete corresponding data from HBase (no “rollback”)

Service implementation

  • Implement service as HBase coprocessor endpoint
  • Provides access to HBase ACL and leverages HBase container for security and high availability
  • Authorization ACL easy within coprocessor. Has access to HBase ACL info and can use it to authorize client access w/o relying on ACLs in ZooKeeper (difficult to maintain).
  • Consider using HBase instead of ZK for revision meta data storage

UML diagram for ZooKeeper based implementation

The following picture shows the class diagram for the zookeeper based revision manager.

Image Added

APIs

The APIs

...

provided by the revision manager interface are:

Code Block

    /**
     * Initialize the revision manager.
     */
    public void initialize(Properties properties);

    /**
     * Opens the revision manager.
     *
     * @throws IOException
     */
    public void open() throws IOException;

    /**
     * Closes the revision manager.
     *
     * @throws IOException
     */
    public void close() throws IOException;

    /**
     * Start the write transaction.
     *
     * @param table The name of table involved in the transaction.
     * @param families The column families of table in which data will be written by the transaction.
     * @return An instance of Transaction
     * @throws IOException
     */
    public Transaction beginWriteTransaction(String table, List<String> families)
            throws IOException;

    /**
     * Start the write transaction.
     *
     * @param table The name of table involved in the transaction.
     * @param families The column families of table in which data will be written by the transaction.
     * @param keepAlive The duration ( in milliseconds) after which the transaction will expire.
     * @return An instance of Transaction
     * @throws IOException
     */
    public Transaction beginWriteTransaction(String table,
            List<String> families, long keepAlive) throws IOException;

    /**
     * Commit the write transaction.
     *
     * @param transaction An instance of Transaction to be committed.
     * @throws IOException
     */
    public void commitWriteTransaction(Transaction transaction)
            throws IOException;

    /**
     * Abort the write transaction.
     *
     * @param transaction An instance of Transaction to be aborted.
     * @throws IOException
     */
    public void abortWriteTransaction(Transaction transaction)
            throws IOException;

    /**
     * Create the latest snapshot of the table.
     *
     * @param tableName The name of the table to create a snapshot.
     * @return An instance of TableSnapshot.
     * @throws IOException
     */
    public TableSnapshot createSnapshot(String tableName) throws IOException;

    /**
     * Create the snapshot of the table using the revision number.
     *
     * @param tableName The name of the table to create a snapshot.
     * @param revision The revision number to be used.
     * @return An instance of TableSnapshot.
     * @throws IOException
     */
    public TableSnapshot createSnapshot(String tableName, long revision)
            throws IOException;

    /**
     * Extends the expiration of a transaction by the time indicated by keep alive.
     *
     * @param transaction An instance of Transaction.
     * @throws IOException
     */
    public void keepAlive(Transaction transaction) throws IOException;


  Revision Manager