Status

Current state: Under Discussion

Discussion threadhere

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: TBD

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As of now, Flink relies on the flink-formats libraries to support various external formats such as Avro, ORC, CSV, etc. In general, each format support involves two tasks: 
a) Serialization and Deserialization of the format;
b) Conversion between the external format and the Flink SQL Rows (data type conversions).

This FLIP proposes to improve the data type conversion support to facilitate the development of format libraries.

In a nutshell, data type conversions require converting an external record object to / from a Flink Row. And that external object is in a format with its own data types. For example, the record objects in Avro can be GenericRecord or SpecificRecord, and the fields in the Avro records are following the Avro type system. The goal of data type conversion is to convert between such external type systems and the Flink SQL type system.

The data type conversion is required in the following two scenarios:

Scenario 1: In the source / sink connectors.

Scenario 2: When converting between DataStream and Table.

Current status

The Flink type system consists of LogicalType and DataType. The LogicaType is the type with semantic meaning recognized by the SQL framework. DataType describes data structure hosting the logical type. There are only four DataTypes at this point: AtomicDataType (for primitive types), CollectionDataType (for lists / arrays), FieldsDataType (for structured types), KeyValueDataType (for K-V maps).

To support data type conversion, as of now, Flink has the following design:

  1. Each of the LogicalTypes has a corresponding "internal data class" to host the data. For example, StringData is the internal data class for the VARCHAR logical type.
  2. Each LogicalType defines a built-in set of data classes that can be converted to / from its internal data class. For example, a LogicalType of VARCHAR whose internal data class is StringData can be converted from / to String or byte[]. Each of the supported conversions has a built-in converter registered in the DataStructureConverters class.
  3. Custom conversions are supported via the logical type of StructuredType which allows users to convert a POJO object from / to a Row.

For conversion scenario 1, in Sources, a format library needs to convert the fields in external objects (e.g. Avro SpecificRecord) to the internal data class (e.g. RowData) according to the defined Table schema. And it will do the opposite in Sinks. Because the external type systems are not supported by the built-in converters defined in DataStructureConverters, the formats implementation have to do the conversion on their own. For example, Avro format relies on its own implementation in RowDataToAvroConverters and AvroToRowDataConverters. Note that these converters have nothing to do with the DataStructureConverters class which only contains the converters for built-in data type conversions, even though all these converters are converting between internal / external data classes.

For conversion scenario 2, only built-in conversions can be used. That means when converting between Table and DataStream, the conversion can only be done using the built-in converters.

Issues

In the effort of enhancing Avro format (FLIP-358) support, two main issues came up with the above design.

  1. For conversion scenarios 2, the built-in conversions are not sufficient. 
    1. The built-in conversion classes are not sufficient. For example, in Avro, a decimal type can be defined as following:
      {"name": "type_decimal_fixed", "type": {"name": "Fixed2", "size": 2, "type": "fixed", "logicalType": "decimal", "precision": 4, "scale": 2}
      The external class of this field is a class of Fixed2, which is not supported by built-in conversions. Using StructuredType, which is a FieldsDataType, for the custom external class is also hacky because this is actually an AtomicType.
      Similarly, a Enum type in Avro can be defined as following:
      {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}
      The external class of this type is a GenericData.EnumSymbol for Avro generic records, or a enum class of Colors for Avro specific records. Neither of them are supported by built-in conversions.
    2. The requirement of StructuredType conversion is too demanding. Theoretically speaking, a StructuredType can be converted to any POJO object. However, this POJO requirement may not always be met. For example, Avro GenericData.Record is not a POJO class.
  2. Heterogeneity in the conversion code paths. If we dive a little deeper in to the two conversion scenarios, there are following four cases:
    1. Table Source. A typical implementation of a Table source is to read bytes from external sources, deserialize the bytes to external objects (GenericRecords / SpecificRecords for Avro), and then convert the external objects to Flink rows. This conversion is up to the format developer to implement.
    2. Table Sink. A typical implementation of a Table sink is to convert Flink rows to external objects (GenericRecords / SpecificRecords for Avro), then write the external objects with external libraries (GenericDatumWriter / SpecificDatumWriter). This conversion is up to the format developer to implement.
    3. Converting from DataStream to Table. A DataType is derived from the TypeInformation of the DataStream, and the conversion class in the DataType is used for the conversion. This only works well if the TypeInformation of the DataStream is a CompositeType.
    4. Converting from Table To DataStream. By default a Table will be converted to a DataStream<Row>. Users can provide a DataType so the table is converted to a DataStream of the conversion class of the DataType.

As of now, the implementation of the above four conversion cases are somewhat related but different from each other, which makes the API quite confusing. Ideally, there should be just one interface of internal / external converter which is used everywhere.

This FLIP proposes to do the following to better support the internal / external conversion.

  1. Create a public interface of DataTypeConverter and use it in all the data type conversion scenarios.
  2. Allow users to define custom conversion classes and the corresponding converter for each LogicalType.

Public Interfaces

  1. Introduce a new interface of org.apache.flink.table.types.conversion.DataTypeConverter.

DataTypeConverter
package org.apache.flink.table.types.conversion;

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;

/**
 * An interface that converts between the internal Flink data structure and the conversion class of a DataType.
*/
@PublicEvolving
public interface DataTypeConverter<I, E> extends Serializable {

   void open(ClassLoader classLoader);

   /**
    * Converts to internal data structure.
    *
    * <p>Note: Parameter must not be null. Output must not be null.
    */
   I toInternal(E external);

   /**
    * Converts to external data structure.
    *
    * <p>Note: Parameter must not be null. Output must not be null.
    */
   E toExternal(I internal);
}

2. Add a withCustomConversion() method to the LogicalType.

LogicalType
public abstract class LogicalType implements Serializable {

    ...

    /**
     * Add a custom conversion to this type. Custom conversions are used to convert between
     * an external class and the internal representation of this logical type.
     *
     * @param clazz the external class for conversion.
     * @param customConverter the converter for the conversion.
     * @return a new LogicalType instance with the custom conversion added.
     */
    public final LogicalType withCustomConversion(
            Class<?> clazz, DataTypeConverter<Object, Object> customConverter) {
        Preconditions.checkNotNull(customConverter,
                "The custom converter should not be null.");
        // Make a copy to ensure the immutability of LogicalType.
        LogicalType copy = copy();
        copy.customConversions.put(clazz, customConverter);
        return copy;
    }

    /**
     * Add custom conversions to this type. Custom conversions are used to convert between
     * an external class and the internal representation of this logical type.
     *
     * @param conversions a map of external class to converter.
     * @return a new LogicalType instance with the custom conversions added.
     */
    public final LogicalType withCustomConversions(
            Map<Class<?>, DataTypeConverter<Object, Object>> conversions) {
        // make a copy to ensure the immutability of LogicalType
        LogicalType copy = copy();
        conversions.forEach((clazz, converter) -> {
            if (converter != null) {
                copy.customConversions.put(clazz, converter);
            }
        });
        return copy;
    }

    ...

}

3. Add the following new method to the AbstractDataType interface.

AbstractDataType
public interface AbstractDataType<T extends AbstractDataType<T>> {
    ...

    /**
     * Adds a hint that data should be represented using the given class when entering or leaving
     * the table ecosystem.
     *
     * <p>A supported conversion class depends on the logical type and its nullability property.
     *
     * <p>Please see the implementation of {@link LogicalType#supportsInputConversion(Class)},
     * {@link LogicalType#supportsOutputConversion(Class)}, or the documentation for more
     * information about supported conversions.
     *
     * @param newConversionClass the class to convert the data to.
     * @param converter the converter to use for the conversion.
     * @return a new, reconfigured data type instance
     */
    default T bridgedTo(Class<?> newConversionClass, DataTypeConverter<Object, Object> converter) {
        throw new UnsupportedOperationException(
                "bridgedTo(Class<?>, DataTypeConverter) is not implemented");
    }
   
    ...
}

This method will be implemented by AtomicDataType, CollectionDataType, FieldsDataType and KeyValueDataType.

Proposed Changes

With the above public interface changes, DataTypeConverter will be the only interface responsible for the data type conversion for all the conversion scenarios.

We will change some of the implementations accordingly:

  • org.apache.flink.table.data.conversion.DataStructureConverter class will be removed and replaced by the DataTypeConverter class.
    • Ideally, these built-in converters should also be exposed to the end users, so they can be leveraged in Source and Sinks (conversion scenario 1). However, currently these converters rely on the internal classes in the runtime package. We need to remove such dependency before we can expose them.
  • The supported conversion class check will honor the user defined conversions. 

The guidance to the format library developer becomes the following:

  • Define a set of DataTypeConverters that can convert between internal and external data classes.
    • For conversion scenario 1, directly call the toInternal() / toExternal() method of the converters.
    • For conversion scenario 2, construct a Schema or DataType with either built-in or the custom data type conversion declared, and invoke StreamTableEnvironment.toDataStream(Table table, AbstractDataType<?> targetDataType) or StreamTableEnvironment.fromDataStream(DataStream<T> dataStream, Schema schema).

There is one issues that we will not address as a part of this FLIP, but as follow up steps:

  • Expose the built-in implementations of DataTypeConverters. Currently the built-in DataStructureConverers are in the flink-table-runtime package. We may need to extract them to a SDK package so users don’t need to depend on flink-table-runtime at authoring time.

Compatibility, Deprecation, and Migration Plan

This FLIP only has incremental changes, and thus is fully backwards compatible.

Test Plan

This FLIP can be tested with the FLIP-358 implementation.

Rejected Alternatives

N/A