Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Connectors have FROM and TO parts. A sqoop job represents data transfer between FROM and TO. IDF API represents how the data is represented as it flows between the FROM and TO via sqoop.  
  • Connectors basically represent different data sources and each data source can have its custom/ native format that it uses. For instance MongoDb might use JSON as its optimal native format, HDFS can use plain CSV text, S3 can use its own custom format. In simple words, every data source has one thing in common, it is collection of rows and each row is a collection of fields / columns. Most if not all data sources have strict schema that tells what each field type is.  

IDF API provides 3 main ways to represent data that flows within sqoop 

  1. Native format - each row in the data source is a native object, for instance in JSONIDF, an entire row and its fields in sqoop will be represented as a JSON object, in AvroIDF, entire row and its fields will be represented as a Avro record
  2. CSV text format - each row and its fields are represented as CSV text
  3. Object Array format  - each field in the row is an element in the object array. Hence a row in the data source is represented as a object array

 

  • IDF encapsulated the native format and the schema associated with each field.
  • Before we understand IDF API, it is important to be aware of the two other low level APIs that sqoop defines for data reading and writing between the FROM and TO data sources
          
          DataReader

 

Code Block
languagejava
titleDataReader
collapsetrue
/**
 * An intermediate layer for passing data from the execution engine
 * to the ETL engine.
 */
public abstract class DataReader {
  /**
   * Read data from the execution engine as an object array.
   * @return - array of objects with each column represented as an object
   * @throws Exception
   */
  public abstract Object[] readArrayRecord() throws Exception;
  /**
   * Read data from execution engine as text - as a CSV record.
   * public abstract Object readContent(int type) throws Exception;
   * @return - CSV formatted data.
   * @throws Exception
   */
  public abstract String readTextRecord() throws Exception;
  /**
   * Read data from execution engine as a native format.
   * @return - the content in the native format of the intermediate data
   * format being used.
   * @throws Exception
   */
  public abstract Object readContent() throws Exception;
}

   

               DataWriter

Code Block
languagejava
titleDataWriter
collapsetrue
/**
 * An intermediate layer for passing data from the ETL framework
 * to the MR framework.
 */
public abstract class DataWriter {
  /**
   * Write an array of objects into the execution framework
   * @param array - data to be written
   */
  public abstract void writeArrayRecord(Object[] array);
  /**
   * Write data into execution framework as text. The Intermediate Data Format
   * may choose to convert the data to another format based on how the data
   * format is implemented
   * @param text - data represented as CSV text.
   */
  public abstract void writeStringRecord(String text);
  /**
   * Write data in the intermediate data format's native format.
   * @param obj - data to be written
   */
  public abstract void writeRecord(Object obj);
}

 

 

  • IDF API is primarily influenced by the above low level apis to read and write data and hence this API dictates that each custom implementations support the following 3 formats
  1. Native format - each row in the data source is a native object, for instance in JSONIDF, an entire row and its fields in sqoop will be represented as a JSON object, in AvroIDF, entire row and its fields will be represented as a Avro record
  2. CSV text format - each row and its fields are represented as CSV text
  3. Object Array format  - each field in the row is an element in the object array. Hence a row in the data source is represented as a object array

 

Code Block
languagejava
titleIntermediateDataFormat API
collapsetrue
 /**
   * Get one row of data.
   *
   * @return - One row of data, represented in the internal/native format of
   *         the intermediate data format implementation.
   */
  public T getData() {
    return data;
  }
  /**
   * Set one row of data. If validate is set to true, the data is validated
   * against the schema.
   *
   * @param obj - A single row of data to be moved.
   */
  public void setData(T obj) {
    this.data = obj;
  }
  /**
   * Get one row of data as CSV text. Use {@link #SqoopIDFUtils} for reading and writing
   * into the sqoop specified CSV text format for each {@link #ColumnType} field in the row
   * Why a "native" internal format and then return CSV text too?
   * Imagine a connector that moves data from a system that stores data as a
   * serialization format called FooFormat. If I also need the data to be
   * written into HDFS as FooFormat, the additional cycles burnt in converting
   * the FooFormat to text and back is useless - so using the sqoop specified
   * CSV text format saves those extra cycles
   * <p/>
   * Most fast access mechanisms, like mysqldump or pgsqldump write the data
   * out as CSV, and most often the source data is also represented as CSV
   * - so having a minimal CSV support is mandated for all IDF, so we can easily read the
   * data out as text and write as text.
   * <p/>
   * @return - String representing the data in CSV text format.
   */
  public abstract String getCSVTextData();
  /**
   * Set one row of data as CSV
Code Block
languagejava
titleIntermediateDataFormat API
collapsetrue
public abstract class IntermediateDataFormat<T> {
  protected volatile T data;
  public int hashCode() {
    return data.hashCode();
  }

  /**
   * Get one row of data.
   *
   * @return - One row of data, represented in the internal/native format of
   *         the intermediate data format implementation.
   */
  public T getData() {
    return data;
  }
  /**
   * Set one row of data. If validate is set to true, the data is validated
   * against the schema.
   *
   * @param data - A single row of data to be moved.
   */
  public abstract void setDatasetCSVTextData(TString data) {
    this.data = datacsvText);
  }
  /**
   * Get one row of data as CSVan Object text. Use SqoopDataUtils for reading and writingarray.
   * intoSqoop theuses sqoopdefined specified CSV text format for each {@link #ColumnType} field in the rowobject representation
   * Whyfor aeach "native" internal format and then return CSV text too?column type. For instance org.joda.time to represent date.
   * ImagineUse a{@link connector#SqoopIDFUtils} thatfor movesreading dataand fromwriting ainto systemthe thatsqoop
 stores data as* a
specified object format *for serializationeach format{@link called#ColumnType} FooFormat.field Ifin I also need the data to bethe row
   * </p>
   * written@return into- HDFSString asrepresenting FooFormat, the additionaldata cyclesas burntan inObject convertingarray
   * theIf FooFormatFROM to text and backTO isschema uselessexist, -we sowill usinguse theSchemaMatcher sqoopto specified
get the  * CSV text format saves those extra cyclesdata according to "TO" schema
   * <p/>
  public *abstract Most fast access mechanisms, like mysqldump or pgsqldump write the data
   * out as CSV, and most often the source data is also represented as CSV
   * - so having a minimal CSV support is mandated for all IDF, so we can easily read the
   * data out as text and write as text.
   * <p/>
   * @return - String representing the data in CSV text format.
   Object[] getObjectData();
 /**
  * Set one row of data as an Object array.
  * It also should construct the data representation
  * that the IDF represents so that the object is ready to
  * consume when getData is invoked. Custom implementations
  * will override this method to convert form object array
  * to the data format
  */
  public abstract Stringvoid getCSVTextDatasetObjectData(Object[] data);
  /**
   * Set the oneschema row offor serializing/de-serializing data as CSV.
   *
   */
  public abstract void setCSVTextData(String csvText);
  /**@param schema
   * Get   one row of data as an Object- array.the Sqoopschema usesused defined object representationfor serializing/de-serializing data
   * for each column type. For instance org.joda.time to represent date.Use SqoopDataUtils
   * for reading and writing into the sqoop specified object format
   * for each {@link #ColumnType} field in the row
   * </p>
   * @return - String representing the data as an Object array
   * If FROM and TO schema exist, we will use SchemaMatcher to get the data according to "TO" schema/
  public void setSchema(Schema schema) {
    if (schema == null) {
      // TODO(SQOOP-1956): throw an exception since working without a schema is dangerous
      return;
    }
    this.schema = schema;
  ..
  }
  /**
   * Serialize the fields of this object to <code>out</code>.
   *
   * @param out <code>DataOuput</code> to serialize this object into.
   * @throws IOException
   */
  public abstract Object[]void getObjectDatawrite(DataOutput out) throws IOException;
  /**
   * SetDeserialize onethe rowfields of datathis asobject an Object arrayfrom <code>in</code>.
   *
   */
  public abstract void setObjectData(Object[] data);
  /** <p>For efficiency, implementations should attempt to re-use storage in the
   * Set the schema for serializing/de-serializing  data.existing object where possible.</p>
   *
   * @param schema - the schema used for serializing/de-serializing  datain <code>DataInput</code> to deseriablize this object from.
   * @throws IOException
   */
  public abstract void setSchemaread(SchemaDataInput schemain) throws IOException;
  /**
   * SerializeProvide the fieldsexternal of this object to <code>out</code>.
   *jars that the IDF depends on
   * @param@return outset <code>DataOuput</code> to serialize this object into.of jars
   */
  public *Set<String> @throwsgetJars() IOException{
   */
 return public abstract void write(DataOutput out) throws IOException;
  /**new HashSet<String>();
  }
  @Override
   * Deserialize the fields of this object from <code>in</code>.public int hashCode() {
    final int prime = 31;
   *
 int result *= <p>For1;
 efficiency, implementations should attemptresult to= re-useprime storage* inresult the
+ ((data == *null) existing? object0 where: possible.</p>data.hashCode());
   *
 result = prime * result @param+ in <code>DataInput</code> to deseriablize this object from.((schema == null) ? 0 : schema.hashCode());
   * @throwsreturn IOExceptionresult;
   */
  public abstract void read(DataInput in) throws IOException;
}}
  

NOTE: The CSV Text format and the Object Array format are custom to Sqoop and the details of this format for every supported column/field type in the schema are described below.

...

  • The choice of using CSVText and ObjectArray as  as mandated formats for Sqoop IDF are influenced from the Sqoop 1 design, It favors some traditional fast dump databases but there is no real benchmark to prove how optimal it is vs using Avro or other formats for representing the data
  • Using  intermediate format might lead to discrepancies in some specific column types, for instance using JODA for representing the date time objects only gives 3 digit precision, where as the sql timestamp from JDBC sources supports 6 digit precision
  • More importantly SqoopConnector API has a getIDF..() method, that ties a connector to a  specific intermediate format for all the supported directions ( i.e both FROM and TO at this point) . This means the connector in both FROM and TO side has to provide this format and expect this format respectively. 
  • There are 3 different formats as described above in each IDF implementation, so each connector can potentially support one of these formats and that is not obvious at all when a connector proclaims to use a particular implementation of IDF such as CSVIntermediateDataFormat. For instance the GenericJDBCConnector says it uses CSVIntermediateDataFormat but chooses to write objectArray in extractor and readObjectArray in Loader. Hence it is not obvious what is the format underneath that it will read and write to. On the other hand, HDFSConnector also says it uses CSVIntermediateDataFormat but, uses only the CSV text format in the extractor and loader at this point. May change in future. 
  • A connector possibly should be able to handle multiple IDFs, and expose the supported IDFs per direction. It is not possible today, For instance a sqoop job should be able to dynamically choose the IDF for HDFSConnector when used in the TO direction. The job might be able to say, use AVRO IDF for the TO side and hence load all my data into HDFS in avro format. This means when doing the load, the HDFS will use the readContent API of the  SqoopOutputFormatDataReader

    . But today HDFS can only say it uses CSVIntermediateDataFormat and the data loaded into HDFS will need conversion from CSV to Avro as a separate step.
  • Assuming that every IDF support to implement a CSVText equivalent is a overkill. If at all we mandated to use CSV and ObjectArray as the 2 formats, we should have made IDF not an API, but in fact a standard implementation, it could have been further extended.  Imagine having to write a JSONIDF or a AvroIDF and still having to replicate the same logic that the default/degenerate CSVIntermediateDataFormat provides. 

 

 

 

 

 

...

  • . But today HDFS can only say it uses CSVIntermediateDataFormat and the data loaded into HDFS will need conversion from CSV to Avro as a separate step.