Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As of now, Flink relies on the flink-formats libraries to support various external formats such as Avro, ORC, CSV, etc. In general, each format support involves two tasks: 
a) Serialization and Deserialization of the format;
b) Conversion between the external format and the Flink SQL Rows (data type conversions).

...

Scenario 2: When converting between DataStream and Table.

Current status

The Flink type system consists of LogicalType and DataType. The LogicaType is the type with semantic meaning recognized by the SQL framework. DataType describes data structure hosting the logical type. There are only four DataTypes at this point: AtomicDataType (for primitive types), CollectionDataType (for lists / arrays), FieldsDataType (for structured types), KeyValueDataType (for K-V maps).

...

For conversion scenario 2, only built-in conversions can be used. That means when converting between Table and DataStream, the conversion can only be done using the built-in converters.

Issues

In the effort of enhancing Avro format (FLIP-358) support, two main issues came up with the above design.

...

  1. Create a public interface of DataTypeConverter and use it in all the data type conversion scenarios.
  2. Allow users to define custom conversion classes and the corresponding converter for each LogicalType.

Public Interfaces

  1. Introduce a new interface of org.apache.flink.table.types.conversion.DataTypeConverter.



Code Block
languagejava
titleDataTypeConverter
package org.apache.flink.table.types.conversion;

import org.apache.flink.annotation.PublicEvolving;

import java.io.Serializable;

/**
 * An interface that converts between the internal Flink data structure and the conversion class of a DataType.
*/
@PublicEvolving
public interface DataTypeConverter<I, E> extends Serializable {

   void open(ClassLoader classLoader);

   /**
    * Converts to internal data structure.
    *
    * <p>Note: Parameter must not be null. Output must not be null.
    */
   I toInternal(E external);

   /**
    * Converts to external data structure.
    *
    * <p>Note: Parameter must not be null. Output must not be null.
    */
   E toExternal(I internal);
}

2. Add a withCustomConversion() method to the LogicalType.

Code Block
languagejava
titleLogicalType
public abstract class LogicalType implements Serializable {

    ...

    /**
     * Add a custom conversion to this type. Custom conversions are used to convert between
     * an external class and the internal representation of this logical type.
     *
     * @param clazz the external class for conversion.
     * @param customConverter the converter for the conversion.
     * @return a new LogicalType instance with the custom conversion added.
     */
    public final LogicalType withCustomConversion(
            Class<?> clazz, DataTypeConverter<Object, Object> customConverter) {
        Preconditions.checkNotNull(customConverter,
                "The custom converter should not be null.");
        // Make a copy to ensure the immutability of LogicalType.
        LogicalType copy = copy();
        copy.customConversions.put(clazz, customConverter);
        return copy;
    }

    /**
     * Add custom conversions to this type. Custom conversions are used to convert between
     * an external class and the internal representation of this logical type.
     *
     * @param conversions a map of external class to converter.
     * @return a new LogicalType instance with the custom conversions added.
     */
    public final LogicalType withCustomConversions(
            Map<Class<?>, DataTypeConverter<Object, Object>> conversions) {
        // make a copy to ensure the immutability of LogicalType
        LogicalType copy = copy();
        conversions.forEach((clazz, converter) -> {
            if (converter != null) {
                copy.customConversions.put(clazz, converter);
            }
        });
        return copy;
    }

    ...

}

3. Add the following new method to the AbstractDataType interface.

Code Block
languagejava
titleAbstractDataType
public interface AbstractDataType<T extends AbstractDataType<T>> {
    ...

    /**
     * Adds a hint that data should be represented using the given class when entering or leaving
     * the table ecosystem.
     *
     * <p>A supported conversion class depends on the logical type and its nullability property.
     *
     * <p>Please see the implementation of {@link LogicalType#supportsInputConversion(Class)},
     * {@link LogicalType#supportsOutputConversion(Class)}, or the documentation for more
     * information about supported conversions.
     *
     * @param newConversionClass the class to convert the data to.
     * @param converter the converter to use for the conversion.
     * @return a new, reconfigured data type instance
     */
    default T bridgedTo(Class<?> newConversionClass, DataTypeConverter<Object, Object> converter) {
        throw new UnsupportedOperationException(
                "bridgedTo(Class<?>, DataTypeConverter) is not implemented");
    }
   
    ...
}

This method will be implemented by AtomicDataType, CollectionDataType, FieldsDataType and KeyValueDataType.

Proposed Changes

With the above public interface changes, DataTypeConverter will be the only interface responsible for the data type conversion for all the conversion scenarios.

...

  • Expose the built-in implementations of DataTypeConverters. Currently the built-in DataStructureConverers are in the flink-table-runtime package. We may need to extract them to a SDK package so users don’t need to depend on flink-table-runtime at authoring time.

Compatibility, Deprecation, and Migration Plan

This FLIP only has incremental changes, and thus is fully backwards compatible.

Test Plan

This FLIP can be tested with the FLIP-358 implementation.

Rejected Alternatives

N/A