...
Components of Sqoop using MR
SqoopMapper
- The current semantics is:By default sqoop job is a map only job. It does not utilize the reducer by default, unless
# Extractors | # Loaders | Outcome |
---|---|---|
Default | Default | Map only job with 10 map tasks |
Number X | Default | Map only job with X map tasks |
Number X | Number Y | Map-reduce job with X map tasks and Y reduce tasks |
Default | Number Y | Map-reduce job with 10 map tasks and Y reduce tasks |
The purpose have been to provide ability to user to throttle both number of loader and extractors in an independent way (e.g. have different number of loaders then extractors) and to have default values that won't run reduce phase if not necessary.
Passing data into the sqoop job ( via the mapper)
There is various information such as the job configs, driver configs, schema of the data read and the schema of the data written required by the Extractor and Loader that has to be passed via the SqoopMapper. It is currently passed securely like this via the credential store or via the configuration
Code Block |
---|
org.apache.hadoop.mapreduce.job.getCredentials().addSecretKey(SCHEMA_FROM_KEY,jsonSchema.getBytes()); org.apache.hadoop.mapreduce.job.getCredentials().addSecretKey(SCHEMA_TO_KEY, jsonSchema.getBytes()) org.apache.hadoop.mapreduce.job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes()); org.apache.hadoop.mapreduce.job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, obj.getClass().getName()); |
SqoopMapper
- Creates the ExtractorContext from the data stored in the configuration and credential store to pass to the connectors extract API
Creates the SqoopSplit that holds the partition information for the data to be extracted
- Post extract call, records the Hadoop counters related to Extraction logic
- Passing data out of Mapper : DistributedCache can be used if we need to write any information from the extractor back to the sqoop repository
Sqoop Writable(Comparable)
- Having a Writable class is required by Hadoop framework - we are using the current one as a wrapper
...
- for IntermediateDataFormat. Read more on IDF here
- We're not using a concrete implementation such as Text, so that we don't have to convert all records to String to transfer data between mappers and reducers.
Passing data into the sqoop job ( via the mapper)
SqoopInputFormat
SqoopSplit
SqoopNullOutputFormat
Passing data out of the sqoop job ( via the outputFormat)
SqoopReducer
...
- SqoopWritable delegates a lot of its functionality to the IntermediateDataFormat implementation used in the sqoop job, for instance the compareTo method on the Writable can used any custom logic of the underlying IDF for sorting records Extracted and then eventually used to write in the Load phase
SqoopSplit
An InputSplit describes a unit of work that comprises a single map task in a MapReduce program, SqoopSplit extends InputSplit
- Instantiates a custom Partition class to be used for splitting the input, in our case it is the data Extracted in the extract phase
- Delegates to the Partition object to read and write data
SqoopInputFormat
InputFormat defines: How these data in FROM are split up and read. Provides a factory for RecordReader objects that read the file
SqoopInputFormat is a custom implementation of the MR InputFormat class. SqoopRecordReader is the custom implementation of the RecordReader. The InputSplit has defined a slice of work, but does not describe how to access it. The SqoopRecordReader class actually loads the data from its source and converts it into (key, value) pairs suitable for reading by the Mapper. The SqoopRecordReader is invoked repeatedly on the input until the entire SqoopSplit has been consumed. Each invocation of the SqoopRecordReader leads to another call to the run() method of the Mapper.
Code Block |
---|
public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
...} |
SqoopNullOutputFormat
SqoopReducer
SqoopOutputFormatLoadExecutor
Few related tickets proposed for enhancement
https://issues.apache.org/jira/browse/SQOOP-1601
https://issues.apache.org/jira/browse/SQOOP-1603