Overview

With the popularity of key-value stores such as HBase, Cassandra, etc. The Hadoop MR APIs need to be extended to provide a generic framework for supporting random access analogous to InputFormat and OutputFormat for streaming data. This helps developers avoid having to write their own glue code to take advantage of random access in MR jobs.

Design

This framework introduces two new classes: RandomAccess and RandomAccessCommitter. An HBase implementation is used as an example.

RandomAccess

public abstract class RandomAccess<KEY,INPUT,OUTPUT> {

    /**
     * Add another random access configuration into the configuration instance
     * @param alias used to reference a random access resource
     * @param clazz class of random access implementation
     * @param properties configuration which will be passed to RandomAccess.initialize()
     * @param conf configuration instance which will be configured with the resource
     * @throws IllegalArgumentException if alias, clazz, properties or conf is null
     */
    public static void addRandomAccess(String alias, Class<?> clazz, Map<String,String> properties, Configuration conf) {
    }

    /**
     * Get a random access instance identified by the alias
     * @param alias
     * @param keyClazz class of the object used as a key
     * @param inputClazz class retrieved when reading data
     * @param outputClazz class used to write out data with
     * @return
     * @throws IllegalArgumentException if alias does not exist, or if conf is null,
     */
    public static <KEY,INPUT,OUTPUT> RandomAccess<KEY,INPUT,OUTPUT> getRandomAccess(String alias,
            Class<KEY> keyClazz, Class<INPUT> inputClazz, Class<OUTPUT> outputClazz, Configuration conf) {
        return null;
    }

    /**
     * Called to initialize class. Called after setConf if interface is implemented
     * {@link org.apache.hadoop.conf.Configurable}
     * @param properties key-value pairs used to configure a specific implementation
     * @throws IOException
     */
    public abstract void initialize(Map<String,String> properties) throws IOException;

    /**
     * Retrieve a row.
     * @param key of row to retrieve
     * @return
     * @throws IllegalArgumentException if key is null.
     */
    public abstract INPUT get(KEY key) throws IOException;

    /**
     * Update a row or insert if it does not exist.
     * @param key
     * @param value
     * @throws IOException
     * @throws IllegalArgumentException if value is null, or if key is required
     */
    public abstract void put(KEY key, OUTPUT value) throws IOException;

    /**
     * Gets the committer for the RandomAccess instance
     * @param context
     * @return RandomAccessCommitter instance or null if non exists
     * @throws IOException
     */
    public abstract RandomAccessCommitter getCommitter(JobContext context) throws IOException;
}

RandomAccess is the class which exposes random access operations and which the user will have the most interaction with. Implementations will have to subclass this. RandomAccess also expose static methods, one is used to retrieve RandomAccess instances (getRandomAccess) and another is to add a RandomAccess configuration into a Configuration object (addRandomAccess).

RandomAccessCommitter

/**
 * Provides users a way to perform setup and teardown procedures
 * for an MR Job. The methods provide the same contract as their
 * streaming counterpart {@link org.apache.hadoop.mapreduce.OutputCommitter}
 */
public abstract class RandomAccessCommitter {

    public abstract void setupJob(JobContext context) throws IOException;

    public abstract void commitJob(JobContext context) throws IOException;

    public abstract void abortJob(JobContext context) throws IOException;

}

Similar to the OutputCommitter, RandomAccessOutputCommitter enables RandomAccess implementors a means of doing before setup and teardown procedures as needed by the underlying storage system. This will piggyback on an OutputCommitter wrapper to make use of the same MR job hooks. Wrapper implementations need to be made for both mapreduce and mapred. For now only the mapreduce implementation is planned.

Usage

In job client code. A RandomAccess configuration is declared:

Job job = new Job(new JobConf());
....
job.setOutputFormatClass(TextOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("/foo"));

Map<String,String>properties = new HashMap<String,String>();
HBaseRandomAccess.setTable("tableFoo",properties);
HBaseRandomAccess.setScanColumns("my_family:my_col",properties);
RandomAccess.addRandomAccess("myTable", HBaseRandomAccess.class,
                properties, job.getConfiguration());

job.waitForCompletion(true);

In Mapper code. A RandomAccess implementation is retrieved:

    public static class SampleMap extends Mapper<LongWritable,Text,LongWritable,Text> {
        RandomAccess access;


        public void map(LongWritable longWritable, Text text, OutputCollector<LongWritable, Text> longWritableTextOutputCollector, Reporter reporter) throws IOException {
            access.get(new byte[]{});
            access.put(new byte[]{},new Put());
        }

        protected void setup(Context context) throws IOException, InterruptedException {
            access = RandomAccess.getRandomAccess("myTable", new byte[]{}.getClass(), Result.class, Put.class,context.getConfiguration());
        }
    }
  • No labels