Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

  • Has a handle to the concrete execution engine which is the org.apache.hadoop.mapred.JobClient in our case
  • Initialize API to set up the submission engine 
  • Submit API is blocking if using the hadoopLocalRunner and returns a boolean for success or failure of submission and async if non-local. In case of async, the update API is used subsequently to track the progress of the job submission
  • Update API can be invoked to query the status of the running job and update the Job submission record that holds the history information of a sqoop job across multiple runs
  • Stop API to abort a running job
  • Destroy API to mirror the initialize to clean up the submission engine on exit

MR ExecutionEngine

  • Has a handle to the JobRequest object populated by the  JobManager 
  • PrepareJob API to set up the necessary information required by the org.apache.hadoop.mapred.JobClient in our case 

...

Components of Sqoop using MR

We want to read records from FROM and write them to TO in Sqoop, We want to do this in a parallel way, so we use the MR engine. We spawn numExtractors ( a job config )  indicated  map tasks and numLoaders ( a job config ) indicated reduce tasks. So this way we can read records/ messages/ rows in parallel and write them in parallel. 

 By default sqoop job is a map only job. It does not utilize the reducer by default, unless 

...

Code Block
org.apache.hadoop.mapreduce.job.getCredentials().addSecretKey(SCHEMA_FROM_KEY,jsonSchema.getBytes());
org.apache.hadoop.mapreduce.job.getCredentials().addSecretKey(SCHEMA_TO_KEY, jsonSchema.getBytes())
org.apache.hadoop.mapreduce.job.getCredentials().addSecretKey(MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY, ConfigUtils.toJson(obj).getBytes());
org.apache.hadoop.mapreduce.job.getConfiguration().set(MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, obj.getClass().getName());

...

SqoopMapper

 

  1. Creates the ExtractorContext from the data stored in the configuration and credential store to pass to the connectors extract API
  2. Creates the SqoopSplit that holds the partition information for the data to be extracted
  3. Post extract call, records the Hadoop counters related to Extraction logic
  4. Passing data out of Mapper : DistributedCache can be used if we need to write any information from the extractor back to the sqoop repository

 

Sqoop Writable(Comparable)

...

  1. Having a Writable class is required by Hadoop framework - we are using the current one as a wrapper for IntermediateDataFormat. Read more on IDF here
  2. We're not using a concrete implementation such as Text, so that we don't have to convert all records to String to transfer data between mappers and reducers.
  3. SqoopWritable delegates a lot of its functionality to the IntermediateDataFormat implementation used in the sqoop job, for instance the compareTo method on the Writable can used any custom logic of the underlying IDF for sorting records Extracted and then eventually used to write in the Load phase

 

SqoopSplit

 

  1. An InputSplit describes a unit of work that comprises a single map task in a MapReduce program, SqoopSplit extends InputSplit
  2. Instantiates a custom Partition class to be used for splitting the input, in our case it is the data Extracted in the extract phase
  3. Delegates to the Partition object to read and write data
  4. It is the Key to the SqoopInputFormat

 

SqoopInputFormat

 

InputFormat defines: How these  how the data in FROM are split up and read. Provides a factory for RecordReader objects that read the file

SqoopInputFormat is a custom implementation of the MR InputFormat class.  SqoopRecordReader is the custom implementation of the RecordReader.  The InputSplit has defined a slice of work, but does not describe how to access it. The SqoopRecordReader class actually loads the data from its source and converts it into (key, value) pairs suitable for reading by the Mapper.  The SqoopRecordReader is invoked repeatedly on the input until the entire SqoopSplit has been consumed. Each invocation of the SqoopRecordReader leads to another call to the run() method of the Mapper.

Code Block
public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {
...} 

SqoopNullOutputFormat

 The The (key, value) pairs provided by the mapper are passed on the Loader for the TO part. The way they are written is governed by the OutputFormat. SqoopNullOutputFormat extends the OutputFormat class. The goal of this custom outputformat is generates hadoop's NullOutputFormat : generates no output files  on HDFS since HDFS may not always be the source, instead it destination. In our case too HDFS is not always the destination, so we use SqoopNullOutputFormat, a custom class to to delegate writing to the Loader specified in the sqoop job, it relies on the SqoopOutputFormatLoadExecutor to pass the data to the Loader via the SqoopRecordWriter. Much like how the InputFormat SqoopInputFormat actually reads individual records through the RecordReader SqoopRecordReader implementation, the OutputFormat SqoopNullOutputFormat class is a factory for for SqoopRecordWriter objects; these are used to write the individual records to the files as directed by the OutputFormat.the final destination ( in our case the TO part of the sqoop job). Notice the key to the SqoopNullOutputFormat is actually the 

SqoopWritable,that the SqoopRecordWriter uses

 

Code Block
public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWritable> {
...}
 
 private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> {
    @Override
    public void write(SqoopWritable key, NullWritable value) throws InterruptedException {
      free.acquire();
      checkIfConsumerThrew();
      // NOTE: this is the place where data written from SqoopMapper writable is available to the SqoopOutputFormat
      toDataFormat.setCSVTextData(key.toString());
      filled.}

 

SqoopReducer

release();
    }

SqoopDestroyerOutputCommitter is a custom outputcommiter that provides hooks to do final cleanup or in some cases the one-time operations we want to invoke when sqoop job finishes, i,e either fails or succeeds.

SqoopReducer 

Extends the Reducer API and at this point only runs the progressService. It is invoked only when the numLoaders driver config is > 0. It primary use case is throttling. 

Code Block
public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWritable, NullWritable> {
..
      progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
} 


Why do we have ability to run reduce phase and why it’s part of throttling?

 

The original idea was that you want to throttle “From” and “To” side independently. For example if I’m exporting data from HBase to relational database, I might want to have one extractor (=mapper) per HBase region - but number of regions very likely will be more then number of pumping transactions that I want to have on my database, so I might want to specify a different number of loaders to throttle that down. But having reduce phase means to serialize all data and transfer them across network, so we are not running reduce phase unless user explicitly sets different number of loaders then reducers.

 

SqoopOutputFormatLoadExecutor and SqoopOutputFormatDataReader 

  1. The LoaderContext is set up in the ConsumerThread.run(..) method. 
  2. Loader's load method is invoked passing the SqoopOutputFormatDataReader and the LoaderContext
  3. The load method invokes the SqoopOutputFormatDataReader to read to records from the SqoopRecordWriter associated with the SqoopNullOutputFormat
  4. The SqoopOutputFormatLoadExecutor uses ConsumerThread to parallelize the extraction and loading process in addition to the parallelizing the extract only part using the numExtractors configured. More details are explained in the SQOOP-1938
     

    TL;DR: Parallelize reads and writes rather than have them be sequential.

    Most of the threading magic is for a pretty simple reason - each mapper does I/O in 2 places - one is writes to HDFS, the other is read from the DB (at that time, extend it to the new from/to architecture, you'd still have 2 I/O). By having a linear read-write code, you are essentially not reading anything while the write is happening, which seems like a pretty inefficient thing to do - you could easily read while the write is happening by parallelizing the reads and writes, which is what is being done. In addition, there is also some additional processing/handling that the output format does, which can cost time and CPU - at which point you could rather read from the DB.


 

Few related tickets proposed for enhancement 

...