Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

...

  • Has a handle to the concrete execution engine which is the org.apache.hadoop.mapred.JobClient in our case
  • Initialize API to set up the submission engine 
  • Submit API is blocking if using the hadoopLocalRunner and returns a boolean for success or failure of submission and async if non-local. In case of async, the update API is used subsequently to track the progress of the job submission
  • Update API can be invoked to query the status of the running job and update the Job submission record that holds the history information of a sqoop job across multiple runs
  • Stop API to abort a running job
  • Destroy API to mirror the initialize to clean up the submission engine on exit

MR ExecutionEngine

  • Has a handle to the JobRequest object populated by the  JobManager 
  • PrepareJob API to set up the necessary information required by the org.apache.hadoop.mapred.JobClient in our case 

...

Components of Sqoop using MR

We want to read records from FROM and write them to TO in Sqoop, We want to do this in a parallel way, so we use the MR engine. We spawn numExtractors ( a job config )  indicated  map tasks and numLoaders ( a job config ) indicated reduce tasks. So this way we can read records/ messages/ rows in parallel and write them in parallel. 

 By default sqoop job is a map only job. It does not utilize the reducer by default, unless 

...

SqoopInputFormat

InputFormat defines: How these  how the data in FROM are split up and read. Provides a factory for RecordReader objects that read the file

...

The (key, value) pairs provided by the mapper are passed on the Loader for the TO part. The way they are written is governed by the OutputFormat. SqoopNullOutputFormat extends the OutputFormat class. The real goal of this hadoop's NullOutputFormat : generates no output files  on HDFS since HDFS may not always be the destination. In our case it too HDFS is not always the destination, so , it is just named SqoopNullOutputFormat ( VB: Am I right?) we use SqoopNullOutputFormat, a custom class to to delegate writing to the Loader specified in the sqoop job, it relies on the SqoopOutputFormatLoadExecutor to pass the data to the Loader via the SqoopRecordWriter. Much like how the SqoopInputFormat actually reads individual records through the SqoopRecordReader implementation, the SqoopNullOutputFormat class is a factory for SqoopRecordWriter objects; these are used to write the individual records to the final destination ( in our case the Loader)the TO part of the sqoop job). Notice the key to the SqoopNullOutputFormat is actually the 

SqoopWritable,that the SqoopRecordWriter uses

 

Code Block
public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWritable> {
...}
 
 private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> {
    @Override
    public void write(SqoopWritable key, NullWritable value) throws InterruptedException {
      free.acquire();
      checkIfConsumerThrew();
      // NOTE: this is the place where data written from SqoopMapper writable is available to the SqoopOutputFormat
      toDataFormat.setCSVTextData(key.toString());
      filled.release();
    }

SqoopDestroyerOutputCommitter is a custom outputcommiter that provides hooks to do final cleanup or in some cases the one-time operations we want to invoke when sqoop job finishes, i,e either fails or succeeds.

SqoopReducer 

Extends the Reducer API and at this point only runs the progressService. It is invoked only when the numLoaders driver config is > 0. See above. VB: It is still un clear to me, how this would support throttling as indicated in this ticket, Looking for some details on how invoking a reducer helps. primary use case is throttling.

Code Block
public class SqoopReducer extends Reducer<SqoopWritable, NullWritable, SqoopWritable, NullWritable> {
..
      progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
} 

SqoopOutputFormatLoadExecutor and SqoopOutputFormatDataReader

...


Why do we have ability to run reduce phase and why it’s part of throttling?

 

The original idea was that you want to throttle “From” and “To” side independently. For example if I’m exporting data from HBase to relational database, I might want to have one extractor (=mapper) per HBase region - but number of regions very likely will be more then number of pumping transactions that I want to have on my database, so I might want to specify a different number of loaders to throttle that down. But having reduce phase means to serialize all data and transfer them across network, so we are not running reduce phase unless user explicitly sets different number of loaders then reducers.

 

SqoopOutputFormatLoadExecutor and SqoopOutputFormatDataReader 

  1. The LoaderContext is set up in the ConsumerThread.run(..) method. 
  2. Loader's load method is invoked passing the SqoopOutputFormatDataReader and the LoaderContext
  3. The load method invokes the SqoopOutputFormatDataReader to read to records from the SqoopRecordWriter associated with the SqoopNullOutputFormat
  4. The SqoopOutputFormatLoadExecutor uses ConsumerThread to parallelize the extraction and loading process in addition to the parallelizing the extract only part using the numExtractors configured. More details are explained in the SQOOP-1938
     

    TL;DR: Parallelize reads and writes rather than have them be sequential.

    Most of the threading magic is for a pretty simple reason - each mapper does I/O in 2 places - one is writes to HDFS, the other is read from the DB (at that time, extend it to the new from/to architecture, you'd still have 2 I/O). By having a linear read-write code, you are essentially not reading anything while the write is happening, which seems like a pretty inefficient thing to do - you could easily read while the write is happening by parallelizing the reads and writes, which is what is being done. In addition, there is also some additional processing/handling that the output format does, which can cost time and CPU - at which point you could rather read from the DB.


 

Few related tickets proposed for enhancement 

...