Loading data in Pig with HCatLoader
Let's start by examining a simple Pig script with the following attributes:
- Loads data through HCatalog.
- Filters data by partition fields (
part_dt
) and non-partition fields (type
). - Projects a single field from the records.
data = load 'db.table' using org.apache.hcatalog.pig.HCatLoader(); data = filter data by part_dt >= '20120725T000000Z' and part_dt < '20120726T000000Z' and type == 'LLAMA'; data = foreach data generate type; dump data;
A number of existing Pig optimizations are supported to efficiently load data through HCatalog, both in the frontend and backend.
In the Pig Frontend
The first interaction with HCatLoader
happens while Pig is parsing the query and generating the logical plan. HCatLoader.getSchema
is called, which causes HCatalog to query the HiveMetaStore for the table schema, which is used as the schema for all records in this table. This is important to understand because each partition can have its own schema, but only fields in the table schema are actually usable.
Once the logical query plan is complete, Pig compiles and optimizes the plan. During optimization, Pig calls HCatLoader.getPartitionKeys
which queries the HiveMetaStore for the table partition keys, and HCatLoader.setPartitionFilter
to set the filter that will be used later on when querying the HiveMetaStore for input partitions.
Let's examine the above filter statement, which includes both partition and non-partion fields. Since Pig knows the table is partitioned by part_dt
(but not type
), the following partition filter is used: ((part_dt >= '20120725T000000Z') and (part_dt < '20120726T000000Z'))
Let's also briefly examine how Pig loaders work. At the most basic level a Pig loader implements LoadFunc
, providing methods to set the input location, get records, etc. Additional features may be provided by implementing additional interfaces; in this context the interesting interfaces implemented by HCatLoader
are LoadMetadata
(providing Pig with access to the schema, partition keys, and partition filtering functionality) and LoadPushDown
which provides projection pushdown support.
During query compilation and optimization, Pig calls HCatBaseLoader.pushProjection
that passes the list of required fields into the loader. Since the above query only makes use of non-partition field type
, that single field is added to a required fields list and pushed into the loader. Later, when constructing Pig tuples the loader can use the required fields list to most efficiently read the data.
After Pig has completed compiling and optimizing the query execution plan, it calls HCatLoader.setLocation
to set the db_name.table_name
to load data from. This causes HCatLoader to query the HiveMetaStore for the partitions that require processing (making sure to use the partition filter we discussed above). These partitions are passed to HCatInputFormat.setInput
which serializes them into the job configuration for access by the Pig backend, and HCatInputFormat.setOutputSchema
serializes the output schema in the job configuration, which will be used when projecting records in the backend.
And finally Pig begins to execute the query.
In the Pig Backend
The Pig backend has two main parts: a client that manages launching MapReduce jobs on the cluster, and the jobs themselves.
When launching a MR job, Pig calls HCatLoader.setLocation
with the db_name.table_name
to load data from, however this time instead of querying the HiveMetaStore for partition information we largely restore data previously serialized into the job configuration, and call HCatBaseInputFormat.setOutputSchema
for use later when projecting records.
Now let's get the input splits, which represent the actual data to load. HCatBaseInputFormat.getSplits
is called, which deserializes the partitions previously stored in the job configuration on the frontend, and for each one creates the actual input format, gets the actual splits, and wraps each one as an HCatSplit
that contains the underlying split, as well as partition information necessary to later deserialize the records it returns.
At this point Pig actually launches a MapReduce job on the cluster.
On the cluster, HCatBaseInputFormat.createRecordReader
is called with an HCatSplit
, the wrapper we created earlier that contains an actual input split, and the partition information needed to deserialize its records. An HCatRecordReader
that contains a storage handler is returned to the framework; the storage handler contains information necessary to read data from the underlying storage and convert them into useable records.
With the RecordReader
initialized, its time to get some actual records! Pig calls HCatBaseLoader.getNext
which gets an HCatRecord
from the HCatRecordReader
we just initialized, converts to a Pig tuple, and hands off to Pig for processing.
Let's explore how HCatRecord
's are created. First, HCatRecordReader.nextKeyValue
is called to get the actual record from the wrapped input format we created earlier. The record is first deserialized with the SerDe defined for the partition, and wrapped in a LazyHCatRecord
, which delays further deserialization until required. Using the output schema set earlier, we create an HCatRecord
with just the necessary fields. Finally, the HCatRecord
is converted into a Pig tuple and handed off to Pig for processing.