You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Loading data in Pig with HCatLoader

Let's start by examining a simple Pig script with the following attributes:

  • Loads data through HCatalog.
  • Filters data by partition fields (part_dt) and non-partition fields (type).
  • Projects a single field from the records.
data = load 'db.table' using org.apache.hcatalog.pig.HCatLoader();

data = filter data by
    part_dt >= '20120725T000000Z' and
    part_dt < '20120726T000000Z' and
    type == 'LLAMA';

data = foreach data generate type;

dump data;

A number of existing Pig optimizations are supported to efficiently load data through HCatalog, both in the frontend and backend.

In the Pig Frontend

The first interaction with HCatLoader happens while Pig is parsing the query and generating the logical plan. HCatLoader.getSchema is called, which causes HCatalog to query the HiveMetaStore for the table schema, which is used as the schema for all records in this table. This is important to understand because each partition can have its own schema, but only fields in the table schema are actually usable.

Once the logical query plan is complete, Pig compiles and optimizes the plan. During optimization, Pig calls HCatLoader.getPartitionKeys which queries the HiveMetaStore for the table partition keys, and HCatLoader.setPartitionFilter to set the filter that will be used later on when querying the HiveMetaStore for input partitions.

Let's examine the above filter statement, which includes both partition and non-partion fields. Since Pig knows the table is partitioned by part_dt (but not type), the following partition filter is used: ((part_dt >= '20120725T000000Z') and (part_dt < '20120726T000000Z'))

Let's also briefly examine how Pig loaders work. At the most basic level a Pig loader implements LoadFunc, providing methods to set the input location, get records, etc. Additional features may be provided by implementing additional interfaces; in this context the interesting interfaces implemented by HCatLoader are LoadMetadata (providing Pig with access to the schema, partition keys, and partition filtering functionality) and LoadPushDown which provides projection pushdown support.

During query compilation and optimization, Pig calls HCatBaseLoader.pushProjection that passes the list of required fields into the loader. Since the above query only makes use of non-partition field type, that single field is added to a required fields list and pushed into the loader. Later, when constructing Pig tuples the loader can use the required fields list to most efficiently read the data.

After Pig has completed compiling and optimizing the query execution plan, it calls HCatLoader.setLocation to set the db_name.table_name to load data from. This causes HCatLoader to query the HiveMetaStore for the partitions that require processing (making sure to use the partition filter we discussed above). These partitions are passed to HCatInputFormat.setInput which serializes them into the job configuration for access by the Pig backend, and HCatInputFormat.setOutputSchema serializes the output schema in the job configuration, which will be used when projecting records in the backend.

And finally Pig begins to execute the query.

In the Pig Backend

The Pig backend has two main parts: a client that manages launching MapReduce jobs on the cluster, and the jobs themselves.

When launching a MR job, Pig calls HCatLoader.setLocation with the db_name.table_name to load data from, however this time instead of querying the HiveMetaStore for partition information we largely restore data previously serialized into the job configuration, and call HCatBaseInputFormat.setOutputSchema for use later when projecting records.

Now let's get the input splits, which represent the actual data to load. HCatBaseInputFormat.getSplits is called, which deserializes the partitions previously stored in the job configuration on the frontend, and for each one creates the actual input format, gets the actual splits, and wraps each one as an HCatSplit that contains the underlying split, as well as partition information necessary to later deserialize the records it returns.

At this point Pig actually launches a MapReduce job on the cluster.

On the cluster, HCatBaseInputFormat.createRecordReader is called with an HCatSplit, the wrapper we created earlier that contains an actual input split, and the partition information needed to deserialize its records. An HCatRecordReader that contains a storage handler is returned to the framework; the storage handler contains information necessary to read data from the underlying storage and convert them into useable records.

With the RecordReader initialized, its time to get some actual records! Pig calls HCatBaseLoader.getNext which gets an HCatRecord from the HCatRecordReader we just initialized, converts to a Pig tuple, and hands off to Pig for processing.

Let's explore how HCatRecord's are created. First, HCatRecordReader.nextKeyValue is called to get the actual record from the wrapped input format we created earlier. The record is first deserialized with the SerDe defined for the partition, and wrapped in a LazyHCatRecord, which delays further deserialization until required. Using the output schema set earlier, we create an HCatRecord with just the necessary fields. Finally, the HCatRecord is converted into a Pig tuple and handed off to Pig for processing.

  • No labels