Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: fix description of MapReduce read example (HIVE-4590)

Input and Output Interfaces

Table of Contents

Set Up

No HCatalog-specific setup is required for the HCatInputFormat and HCatOutputFormat interfaces.

...

You can use the getTableSchema method to determine the table schema for a specified input table.

No Format

  /**
   * Set the input to use for the Job. This queries the metadata server with
   * the specified partition predicates, gets the matching partitions, puts
   * the information in the conf object. The inputInfo object is updated with
   * information needed in the client context
   * @param job the job object
   * @param inputJobInfo the input info for table to read
   * @throws IOException the exception in communicating with the metadata server
   */
  public static void setInput(Job job,
      InputJobInfo inputJobInfo) throws IOException;

  /**
   * Set the schema for the HCatRecord data returned by HCatInputFormat.
   * @param job the job object
   * @param hcatSchema the schema to use as the consolidated schema
   */
  public static void setOutputSchema(Job job,HCatSchema hcatSchema)
    throws IOException;

  /**
   * Get the HCatTable schema for the table specified in the HCatInputFormat.setInput
   * call on the specified job context. This information is available only after
   * HCatInputFormat.setInput has been called for a JobContext.
   * @param context the context
   * @return the table schema
   * @throws IOException if HCatInputFormat.setInput has not been called
   *                     for the current context
   */
  public static HCatSchema getTableSchema(JobContext context)
    throws IOException;

...

The first call on the HCatOutputFormat must be setOutput; any other call will throw an exception saying the output format is not initialized. The schema for the data being written out is specified by the setSchema method. You must call this method, providing the schema of data you are writing. If your data has the same schema as the table schema, you can use HCatOutputFormat.getTableSchema() to get the table schema and then pass that along to setSchema().

No Format

  /**
   * Set the information about the output to write for the job. This queries the metadata
   * server to find the StorageHandler to use for the table. It throws an error if the
   * partition is already published.
   * @param job the job object
   * @param outputJobInfo the table output information for the job
   * @throws IOException the exception in communicating with the metadata server
   */
  @SuppressWarnings("unchecked")
  public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException;

  /**
   * Set the schema for the data being written out to the partition. The
   * table schema is used by default for the partition if this is not called.
   * @param job the job object
   * @param schema the schema for the data
   * @throws IOException
   */
  public static void setSchema(final Job job, final HCatSchema schema) throws IOException;

  /**
   * Get the table schema for the table specified in the HCatOutputFormat.setOutput call
   * on the specified job context.
   * @param context the context
   * @return the table schema
   * @throws IOException if HCatOutputFormat.setOutput has not been called
   *                     for the passed context
   */
  public static HCatSchema getTableSchema(JobContext context) throws IOException;

...

Your MapReduce program needs to be told where the Thrift server is. The easiest way to do this is to pass the location as an argument to your Java program. You need to pass the Hive and HCatalog jars to MapReduce as well, via the -libjars argument.

No Format

export HADOOP_HOME=<path_to_hadoop_install>
export HCAT_HOME=<path_to_hcat_install>
export HIVE_HOME=<path_to_hive_install>
export LIB_JARS=$HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar,
$HIVE_HOME/lib/hive-metastore-0.10.0.jar,
$HIVE_HOME/lib/libthrift-0.7.0.jar,
$HIVE_HOME/lib/hive-exec-0.10.0.jar,
$HIVE_HOME/lib/libfb303-0.7.0.jar,
$HIVE_HOME/lib/jdo2-api-2.3-ec.jar,
$HIVE_HOME/lib/slf4j-api-1.6.1.jar

export HADOOP_CLASSPATH=$HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar:
$HIVE_HOME/lib/hive-metastore-0.10.0.jar:
$HIVE_HOME/lib/libthrift-0.7.0.jar:
$HIVE_HOME/lib/hive-exec-0.10.0.jar:
$HIVE_HOME/lib/libfb303-0.7.0.jar:
$HIVE_HOME/lib/jdo2-api-2.3-ec.jar:
$HIVE_HOME/conf:$HADOOP_HOME/conf:
$HIVE_HOME/lib/slf4j-api-1.6.1.jar

$HADOOP_HOME/bin/hadoop --config $HADOOP_HOME/conf jar <path_to_jar>
<main_class> -libjars $LIB_JARS <program_arguments>

...

Instead, you can optimize to ship libjars using HDFS locations. By doing this, Hadoop will reuse the entries in the distributed cache.

No Format

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp

export LIB_JARS=hdfs:///tmp/hcatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar

# (Other statements remain the same.)

...

The following very simple MapReduce program reads data from one table which it assumes to have an integer in the second column ("column 1"), and counts how many different values it seesinstances of each distinct value it finds. That is, it does the equivalent of "select col1, count(*) from $table group by col1;".

For example, if the values in the second column are {1,1,1,3,3,5} the program will produce this output of values and counts:

1, 3
3, 2
5, 1

No Format

public class GroupByAge extends Configured implements Tool {

    public static class Map extends
            Mapper<WritableComparable, HCatRecord, IntWritable, IntWritable> {

        int age;

        @Override
        protected void map(
                WritableComparable key,
                HCatRecord value,
                org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord,
                        IntWritable, IntWritable>.Context context)
                throws IOException, InterruptedException {
            age = (Integer) value.get(1);
            context.write(new IntWritable(age), new IntWritable(1));
        }
    }

    public static class Reduce extends Reducer<IntWritable, IntWritable,
    WritableComparable, HCatRecord> {


      @Override
      protected void reduce(
              IntWritable key,
              java.lang.Iterable<IntWritable> values,
              org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
                      WritableComparable, HCatRecord>.Context context)
              throws IOException, InterruptedException {
          int sum = 0;
          Iterator<IntWritable> iter = values.iterator();
          while (iter.hasNext()) {
              sum++;
              iter.next();
          }
          HCatRecord record = new DefaultHCatRecord(2);
          record.set(0, key.get());
          record.set(1, sum);

          context.write(null, record);
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        args = new GenericOptionsParser(conf, args).getRemainingArgs();

        String inputTableName = args[0];
        String outputTableName = args[1];
        String dbName = null;

        Job job = new Job(conf, "GroupByAge");
        HCatInputFormat.setInput(job, InputJobInfo.create(dbName,
                inputTableName, null));
        // initialize HCatOutputFormat

        job.setInputFormatClass(HCatInputFormat.class);
        job.setJarByClass(GroupByAge.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(WritableComparable.class);
        job.setOutputValueClass(DefaultHCatRecord.class);
        HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName,
                outputTableName, null));
        HCatSchema s = HCatOutputFormat.getTableSchema(job);
        System.err.println("INFO: output schema explicitly set for writing:"
                + s);
        HCatOutputFormat.setSchema(job, s);
        job.setOutputFormatClass(HCatOutputFormat.class);
        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new GroupByAge(), args);
        System.exit(exitCode);
    }
}

...

Assume for example you have a web_logs table that is partitioned by the column "ds". You could select one partition of the table by changing

No Format

HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));

to

No Format

HCatInputFormat.setInput(job,
                         InputJobInfo.create(dbName, inputTableName, "ds=\"20110924\""));

...

To write to a single partition you can change the above example to have a Map of key value pairs that describe all of the partition keys and values for that partition. In our example web_logs table, there is only one partition column (ds), so our Map will have only one entry. Change

No Format

HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));

to

No Format

Map partitions = new HashMap<String, String>(1);
partitions.put("ds", "20110924");
HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, partitions));

To write multiple partitions simultaneously you can leave the Map null, but all of the partitioning columns must be present in the data you are writing.

 

Panel
titleColorindigo
titleBGColorsilver
titleNavigation Links

Previous: Load and Store Interfaces
Next: Reader and Writer Interfaces

General: HCatalog ManualWebHCat ManualHive Wiki HomeHive Project Site
Old version of this document (HCatalog 0.5.0): Input and Output Interfaces