Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Migrated to Confluence 5.3

toc 

Associated  JIRA ticket : SQOOP-1350 and its sub tickets for more granular discussion points

Table of Contents

**The following details are relevant post 1.99.4 release, i,e from 1.99.5 onwards.

Background

Intermediate Data Format (IDF)

  • Connectors have FROM and TO parts. A sqoop job represents data transfer between FROM and TO. IDF API represents how the data is represented as it flows between the FROM and TO via sqoop.

...

  •  
  • Connectors basically represent different data sources and each data source can have its custom/ native format that it uses. For instance MongoDb might use JSON as its optimal native format, HDFS can use plain CSV text, S3 can use its own custom format. In simple words, every data source has one thing in common, it is collection of rows and each row is a collection of fields / columns. Most if not all data sources have strict schema that tells what each field type is

...

IDF API provides 3 main ways to represent data that flows within sqoop 

...

  • . IDF encapsulated the native format and the schema associated with each field.
  • Before we understand IDF API, it is important to be aware of the two other low level APIs that sqoop defines for data reading and writing between the FROM and TO data sources
          
          DataReader

...

 

Code Block
languagejava
titleIntermediateDataFormat APIDataReader
collapsetrue
public abstract class IntermediateDataFormat<T> {
  protected volatile T data;
  public int hashCode() {
    return data.hashCode();
  }
  /**
 * An intermediate layer for passing data from the execution engine
 * to the ETL engine.
 */
public abstract class DataReader {
  /**
   * SetRead onedata rowfrom ofthe data.execution Ifengine validateas isan setobject toarray.
 true, the data* is@return validated
- array of *objects with againsteach thecolumn schema.
represented as an *object
   * @throws @paramException
 data - A*/
 single rowpublic ofabstract data to be moved.Object[] readArrayRecord() throws Exception;
   /**/
  public void* setData(TRead data) {
from execution engine as this.datatext =- data;
as a }
  /**CSV record.
   * Getpublic oneabstract rowObject of data.
   *readContent(int type) throws Exception;
   * @return - OneCSV rowformatted of data,.
 represented in the* internal/native format of
@throws Exception
    *         the intermediate data format implementation.
   */
  public abstract TString getDatareadTextRecord() {
    return data;
  }throws Exception;
  /**
   * GetRead onedata rowfrom ofexecution dataengine as CSV.
a native  *format.
   * @return - String representing the datacontent in the CSV,native accordingformat toof the "FROM" schema.intermediate data
   * Noformat schemabeing conversionused.
 is done on* textData, to keep it as "high performance" option.@throws Exception
   */
  public abstract StringObject getTextDatareadContent();
 throws /**
   * Set one row of data as CSV.
   *
   */
  public abstract void setTextData(String text);Exception;
}

   

               DataWriter

Code Block
languagejava
titleDataWriter
collapsetrue
/**
 * An intermediate layer for passing data from the ETL framework
 * to the MR framework.
 */
public abstract class DataWriter {
  /**
   * GetWrite onean rowarray of dataobjects asinto anthe Objectexecution array.framework
   *
   * @return - String representing the data as an Object array
   * If FROM and TO schema exist, we will use SchemaMatcher to get the data according to "TO" schema
   */
  public abstract Object[] getObjectData();
  /** @param array - data to be written
   */
  public abstract void writeArrayRecord(Object[] array);
  /**
   * Write data into execution framework as text. The Intermediate Data Format
   * may choose to convert the data to another format based on how the data
   * format is implemented
   * Set@param onetext row- ofdata datarepresented as anCSV Object array.
   *text.
   */
  public abstract void setObjectDatawriteStringRecord(Object[]String datatext);
  /**
   * SetWrite thedata schemain for serializing/de-serializing  data.
   *the intermediate data format's native format.
   * @param schemaobj - thedata schemato used for serializing/de-serializing  databe written
   */
  public abstract void setSchemawriteRecord(SchemaObject schemaobj);
}
  /**
   * Serialize the fields of this object to <code>out</code>.
   *
   * @param out <code>DataOuput</code> to serialize this object into.
   * @throws IOException
   */
  public abstract void write(DataOutput out) throws IOException;
  /**
   * Deserialize the fields of this object from <code>in</code>.
   *
   * <p>For efficiency, implementations should attempt to re-use storage in the
   * existing object where possible.</p>
   *
   * @param in <code>DataInput</code> to deseriablize this object from.
   * @throws IOException
   */
  public abstract void read(DataInput in) throws IOException;

NOTE: The CSV text format and the Object Array format are custom and  prescribed by Sqoop and the details of this format for every supported column type in the schema are are described below.

Design goals

There are a few prior documents that depict the design goals, but it is not crystal clear. Refer to this doc for some context on the research done prior to defining the IDF API. It explains some of the goals of using CSV and Object Array formats. Some of the design influence comes from the its predecessor Sqoop1.

  1. Support data transfer across connectors using an "internal" in memory data representation
  2. CSV is a common format in many databases and hence sqoop's design goals primarily want to optimize for such data sources. But it is unclear how much of performance gains does CSV text provide. 
    The following is a java doc comment I pulled from the code that explains the choice of CSV.

     

    Why a "native" internal format and then return CSV text too?
    Imagine a connector that moves data from a system that stores data as a
    serialization format called FooFormat. If I also need the data to be
    written into HDFS as FooFormat, the additional cycles burnt in converting
    the FooFormat to text and back is useless - so using the sqoop specified
    CSV text format saves those extra cycles
    <p/>
    Most fast access mechanisms, like mysqldump or pgsqldump write the data
    out as CSV, and most often the source data is also represented as CSV
    - so having a minimal CSV support is mandated for all IDF, so we can easily read the
    data out as text and write as text.

Schema ( a.k.a Row ) 

Schema represents a row of fields that are transferred between FROM and TO. Hence schema holds the list of column/ field types for that row. 


 

 

  • IDF API is primarily influenced by the above low level apis to read and write data and hence this API dictates that each custom implementations support the following 3 formats
  1. Native format - each row in the data source is a native object, for instance in JSONIDF, an entire row and its fields in sqoop will be represented as a JSON object, in AvroIDF, entire row and its fields will be represented as a Avro record
  2. CSV text format - each row and its fields are represented as CSV text
  3. Object Array format  - each field in the row is an element in the object array. Hence a row in the data source is represented as a object array

 

Code Block
languagejava
titleIntermediateDataFormat API
collapsetrue
 /**
   * Get one row of data.
   *
   * @return - One row of data, represented in the internal/native format of
   *         the intermediate data format implementation.
   */
  public T getData() {
    return data;
  }
  /**
   * Set one row of data. If validate is set to true, the data is validated
   * against the schema.
   *
   * @param obj - A single row of data to be moved.
   */
  public void setData(T obj) {
    this.data = obj;
  }
  /**
   * Get one row of data as CSV text. Use {@link #SqoopIDFUtils} for reading and writing
   * into the sqoop specified CSV text format for each {@link #ColumnType} field in the row
   * Why a "native" internal format and then return CSV text too?
   * Imagine a connector that moves data from a system that stores data as a
   * serialization format called FooFormat. If I also need the data to be
   * written into HDFS as FooFormat, the additional cycles burnt in converting
   * the FooFormat to text and back is useless - so using the sqoop specified
   * CSV text format saves those extra cycles
   * <p/>
   * Most fast access mechanisms, like mysqldump or pgsqldump write the data
   * out as CSV, and most often the source data is also represented as CSV
   * - so having a minimal CSV support is mandated for all IDF, so we can easily read the
   * data out as text and write as text.
   * <p/>
   * @return - String representing the data in CSV text format.
   */
  public abstract String getCSVTextData();
Code Block
/**
 * Schema represents the data fields that are transferred between {@link #From} and {@link #To}
 */
public class Schema {
  /**
   * NameSet ofone therow schema,of usuallydata aas table nameCSV.
   */
  private public abstract void setCSVTextData(String namecsvText);
  /**
   * OptionalGet note.
one row of */
data as privatean StringObject note;array.
  / **
 Sqoop uses *defined Creationobject date.representation
   */
 for privateeach Date creationDate;
  /**
   * Columns associated with the schemacolumn type. For instance org.joda.time to represent date.
   */
 Use private{@link List<Column> columns;
  /**#SqoopIDFUtils} for reading and writing into the sqoop
   * specified Helperobject setformat for quick column name lookups. each {@link #ColumnType} field in the row
   * </p>
  private Set<String> columNames;

Column & ColumnType ( a.k.a Row Fields )

Column is an abstraction to represent a field in a row. There are custom classes for sub type such as String, Number, Date, Map, Array. It has attributes that provide metadata about the column data such as is that field nullable?, if that field is a String, what is its maxsize?, if it is DateTime, does it support timezone?, if it is a Map, what is the type of the key and what the is the type of the value?, if it is Array, what is the type of the elements?, if it is Enum, what are the supported options for the enum?

Code Block
/**
 * Base class for all the supported types in the Sqoop {@link #Schema}
 */
public abstract class Column {
  * @return - String representing the data as an Object array
   * If FROM and TO schema exist, we will use SchemaMatcher to get the data according to "TO" schema
   */
  public abstract Object[] getObjectData();
 /**
  * Set one row of data as an Object array.
  * It also should construct the data representation
  * that the IDF represents so that the object is ready to
  * consume when getData is invoked. Custom implementations
  * will override this method to convert form object array
  * to the data format
  */
  public abstract void setObjectData(Object[] data);
  /**
   * Name ofSet the column.schema It is optionalfor serializing/de-serializing data.
   */
  String name;
* @param /**schema
   * Whether the column value can be empty/null    - the schema used for serializing/de-serializing data
   */
  public void setSchema(Schema Booleanschema) nullable;{
  /**
  if *(schema By== defaultnull) a{
 column is nullable
   */

ColumnType is an handy enum that represents all the field types Sqoop supports. Note there is a umbrella UNKNOWN type for fields that sqoop does not support.

...

// TODO(SQOOP-1956): throw an exception since working without a schema is dangerous
      return;
    }
    this.schema = schema;
  ..
  }
  /**
   * Serialize the fields of this object to <code>out</code>.
   *
   * @param out <code>DataOuput</code> to serialize this object into.
   * @throws IOException
   */
  public abstract void write(DataOutput out) throws IOException;
  /**
   * Deserialize the fields of this object from <code>in</code>.
   *
   * <p>For efficiency, implementations should attempt to re-use storage in the
   * existing object where possible.</p>
   *
   * @param in <code>DataInput</code> to deseriablize this object from.
   * @throws IOException
   */
  public abstract void read(DataInput in) throws IOException;
  /**
   * Provide the external jars that the IDF depends on
   * @return set of jars
   */
  public Set<String> getJars() {
    return new HashSet<String>();
  }
  @Override
  public int hashCode() {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((data == null) ? 0 : data.hashCode());
    result = prime * result + ((schema == null) ? 0 : schema.hashCode());
    return result;
  }
  

NOTE: The CSV Text format and the Object Array format are custom to Sqoop and the details of this format for every supported column/field type in the schema are described below.

Schema ( a.k.a Row ) 

Schema represents a row of fields that are transferred between FROM and TO. Hence schema holds the list of column/ field types for that row. 

Code Block
languagejava
titleSchema
collapsetrue
/**
 * Schema represents the data fields that are transferred between {@link #From} and {@link #To}
 */
public class Schema {
  /**
   * Name of the schema, usually a table name.
   */
  private String name;
  /**
   * Optional note.
   */
  private String note;
  /**
   * Creation date.
   */
  private Date creationDate;
  /**
   * Columns associated with the schema.
   */
  private List<Column> columns;
  /**
   * Helper set for quick column name lookups.
   */
  private Set<String> columNames;

Column & ColumnType ( a.k.a Row Fields )

Column is an abstraction to represent a field in a row. There are custom classes for sub type such as String, Number, Date, Map, Array. It has attributes that provide metadata about the column data such as is that field nullable?, if that field is a String, what is its maxsize?, if it is DateTime, does it support timezone?, if it is a Map, what is the type of the key and what the is the type of the value?, if it is Array, what is the type of the elements?, if it is Enum, what are the supported options for the enum?

Code Block
languagejava
titleColumn
collapsetrue
/**
 * Base class for all the supported types in the Sqoop {@link #Schema}
 */
public abstract class Column {
  /**
   * Name of the column. It is optional
   */
  String name;
  /**
   * Whether the column value can be empty/null
   */
  Boolean nullable;
  /**
   * By default a column is nullable
   */

ColumnType is an handy enum that represents all the field types Sqoop supports. Note there is a umbrella UNKNOWN type for fields that sqoop does not support.

Code Block
languagejava
titleColumnType
collapsetrue
/**
 * All {@link #Column} types supported by Sqoop.
 */
public enum ColumnType {
  ARRAY,
  BINARY,
  BIT,
  DATE,
  DATE_TIME,
  DECIMAL,
  ENUM,
  FIXED_POINT,
  FLOATING_POINT,
  MAP,
  SET,
  TEXT,
  TIME,
  UNKNOWN,
  ;
}

Design goals for IDF

There are a few prior documents that depict the design goals, but it is not crystal clear. Refer to this doc for some context on the research done prior to defining the IDF API. It explains some of the goals of using CSV and Object Array formats. Some of the design influence comes from the its predecessor Sqoop1.

  1. Support data transfer across connectors using an "internal" in memory data representation
  2. CSV is a common format in many databases and hence sqoop's design goals primarily want to optimize for such data sources. But it is unclear how much of performance gains does CSV text provide. 
    The following is a java doc comment I pulled from the code that explains the choice of CSV.

    Why a "native" internal format and then return CSV text too?
    Imagine a connector that moves data from a system that stores data as a
    serialization format called FooFormat. If I also need the data to be
    written into HDFS as FooFormat, the additional cycles burnt in converting
    the FooFormat to text and back is useless - so using the sqoop specified
    CSV text format saves those extra cycles
    <p/>
    Most fast access mechanisms, like mysqldump or pgsqldump write the data
    out as CSV, and most often the source data is also represented as CSV
    - so having a minimal CSV support is mandated for all IDF, so we can easily read the
    data out as text and write as text.

  3. https://issues.apache.org/jira/browse/SQOOP-1811 has discussions that detail the current IDF API design goals 

 

Warning

The following is the spec as per 1.99.5, Please do not edit this directly in future. If there is format spec change in future releases add a new section to highlight what changed.

1.99.5 SQOOP CSV

...

and Object Format in IDF

Column TypeCSV FormatObject Format
NULL value in the field

  public static final String NULL_FIELD = "NULL";

java null
ARRAY
  • Will be encoded as String (and hence enclosed with '\, inside there will be JSON encoding of the top level array elements (hence the entire value will be enclosed in [] pair), Nested values are not JSON encoded..
  • Few examples:
    • Array of FixedPoint '[1,2,3]'
    • Array of Text '["A","B","C"]'
    • Array of Objects of type FixedPoint '["[11, 12]","[14, 15]"]'
    • Array of Objects of type Text '["[A, B]","[X, Y]"]' 

Refer https://issues.apache.org/jira/browse/SQOOP-1771 for more details

java Object[]
BINARY
byte array enclosed in quotes and encoded with ISO-8859-1 charsetjava byte[]
BIT

true, TRUE, 1

false, FALSE, 0

( not encoded in quotes )

Unsupported values should throw an exception

java boolean

DATE
YYYY-MM-DD ( no time)org.joda.time.LocalDate
DATE_TIME

YYYY-MM-DD HH:MM:DD[.ZZZ][+/-XX] ( fraction and timezone are optional)

Refer https://issues.apache.org/jira/browse/SQOOP-1846 for more details

org.joda.time. DateTime

or

org.joda.time. LocalDateTime

(depends on timezone attribute )

DECIMAL

BigDecimal (not encoded

in quotes )

in quotes ),

 

java BigDecimal

scale and precision fields are handled via :

https://issues.apache.org/jira/browse/SQOOP-2027java BigDecimal

ENUM
Same as TEXTjava String
FIXED_POINT

Integer or Long, ( not encoded in quotes )


java Integer

or

java Long

( depends on

byteSize attribute

and signed attribute)

https://issues.apache.org/jira/browse/SQOOP-2022

FLOATING_POINT
Float or Double ( not encoded in quotes )

java Double

or

java Float

( depends on

byteSize attribute)

depends on

byteSize attribute)

https://issues.apache.org/jira/browse/SQOOP-2022

MAP
  • Will be encoded as String (and hence enclosed with '\, inside there will be JSON encoding of the map (hence the entire value will be enclosed in  pair { }, nested values are also encoded as JSON
  • Map<Number, Number> '{1:20}'
  • Map<String, String> - '{"testKey":"testValue"}'


    Refer https://issues.apache.org/jira/browse/SQOOP-1771 for more details
java.util.Map<Object, Object>
SET
same as ARRAY

java Object[]

TEXT

Entire string will be enclosed in single quotes and all bytes will be printed as they are will exception of following bytes

Byte

Encoded as

0x5C

\ \ (no space) 

0x27

\'

0x22

\"

0x1A

\Z

0x0D

\r

0x0A

\n

0x00

\0

java String
TIME

HH:MM:DD[.ZZZ] ( fraction is optional )

3 digit milli second support only for time

org.joda.time.LocalTime ( No Timezone)
UNKNOWN
same as BINARYsame as java byte[]


Custom Implementations of IDF

...

CSVIntermediateDataFormat

Relevant JIRA :

...

SQOOP-555 and SQOOP-1350

It is one of the sample implementation of the IDF API.

CSV IDF piggy backs on the the Sqoop CSV Format and its native format is the CSV Format represented as String. Talk about bad naming, this can be aptly called DefaultIntermediateDataFormat, since it defaults to the degenerate case of CSV.CSV Format represented as String. 

See the implementation class in the connector-sdk package for more details

NOTE: It may not be obvious but the current IDF design expect every new implementation of it to expose the CSV an ObjectArray formats in addition to its native format.

...

JSONIntermediateDataFormat

Relevant JIRA: SQOOP-1901

Avro Intermediate Data Format

SqoopIDFUtils 

It is a utility class in sqoop to aid connectors in encoding data into expected CSV format and object format and also parsing the CSV string back to the prescribed object format.

No Format
 https://issues.apache.org/jira/browse/SQOOP-1813

 

...

Food for Thought.?

(Some of the below are some serious shortcomings of the current design as it exists)

...

  • The choice of using CSVText and ObjectArray as  as mandated formats for Sqoop IDF are influenced from the Sqoop 1 design, It favors some traditional fast dump databases but there is no real benchmark to prove how optimal it is vs using Avro or other formats for representing the data
  • Using  intermediate format might lead to discrepancies in some specific column types, for instance using JODA for representing the date time objects only gives 3 digit precision, where as the sql timestamp from JDBC sources supports 6 digit precision
  • More importantly SqoopConnector API has a getIDF..() method, that ties a connector to a  specific intermediate format for all the supported directions ( i.e both FROM and TO at this point) . This means the connector in both FROM and TO side has to provide this format and expect this format respectively. 
  • There are 3 different formats as described above in each IDF implementation, so each connector can potentially support one of these formats and that is not obvious at all when a connector proclaims to use a particular implementation of IDF such as CSVIntermediateDataFormat. For instance the GenericJDBCConnector says it uses CSVIntermediateDataFormat but chooses to write objectArray in extractor and readObjectArray in Loader. Hence it is not obvious what is the format underneath that it will read and write to. On the other hand, HDFSConnector also says it uses CSVIntermediateDataFormat but, uses only the CSV text format in the extractor and loader at this point. May change in future. 
  • A connector possibly should be able to handle multiple IDFs, and expose the supported IDFs per direction. It is not possible today, For instance a sqoop job should be able to dynamically choose the IDF for HDFSConnector when used in the TO direction. The job might be able to say, use AVRO IDF for the TO side and hence load all my data into HDFS in avro format. This means when doing the load, the HDFS will use the readContent API of the  SqoopOutputFormatDataReader. But today HDFS can only say it uses CSVIntermediateDataFormat and the data loaded into HDFS will need conversion from CSV to Avro as a separate step.