Status

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, the flink-avro component provides support for reading and writing records in Apache Avro format in various connectors (Kafka, FileSystem, etc). However, at this point, there are a few issues with this component, to name some:

  1. API stability ambiguation. Right now, there are only three classes annotated with @PublicEvolving in the entire flink-avro component. For the rest of the APIs it is unclear whether they are public API or not
  2. Broken BulkFormat support. Currently flink-avro only supports reading from an Avro file into Flink RowData for Flink SQL. And even this is currently broken because the generated reader schema may be incompatible with the writer schema in the Avro file header.
  3. Limited support for SpecificRecord. Currently, SpecificRecord is only supported in DataStream. Use cases such as conversion between a Flink SQL table and DataStream<SpecificRecord> are not supported.


Due to the missing stability annotations, it is unclear whether some of the classes are intended to be used by the users. Assuming the all the classes are exposed, the following table summarizes the currently supported use cases in a more systematic way:

 

 

 

Stream

Batch (FileSource)

Source

SQL / Table

SpecificRecord

N/A

N/A

GenericRecord

Case 1 (Y)

Case 7 (Fix in this FLIP)

DataStream

SpecificRecord

Case 2 (Y)

Case 8 (Separate FLIP)*

GenericRecord

Case 3 (Y)

Case 9 (Separate FLIP)*

Sink

SQL / Table

SpecificRecord

N/A

N/A

GenericRecord

Case 4 (Y)

Case 10 (Y)

DataStream

SpecificRecord

Case 5 (Y)

Case 11 (Y)

GenericRecord

Case 6 (Y)

Case 12 (Y)

DataStream of Avro records to Table conversion

SpecificRecord

Case 13 (semi-working, fix in this FLIP)

GenericRecord

Case 14 (Separate FLIP)**

Table to DataStream of Avro records conversion

SpecificRecord

Case 15 (semi-working, fix in this FLIP)

GenericRecord

Case 16 (Separate FLIP)**

* Support for the use case 8 and 9 is a purely new feature. So, we will have a separate FLIP to cover that.

** Support for the use case 14 and 16 is already there, but it can only treat the entire GenericRecord as a field of RawType as of now. Ideally, we should be able to support GenericRecord with a structured type. The change is an improvement to an existing feature. So, we do that in a separate FLIP. 

For the streaming cases, flink-avro just needs to provide SerializationSchema and DeserializationSchema for the AvroRecords to the Source and Sink implementations. They are already available.

For the batch cases, currently the BulkFormat for DataStream is missing, and BulkFormat for SQL needs to be fixed.

For the Table and DataStream conversion cases, currently there are a few internal implementation issues that need to be solved.

This FLIP proposes to do the following:

  1. Add the missing stability annotations to the classes in flink-avro.
  2. Officially support / fix all the use cases in the above table to make them work out of the box, except for those marked as N/A.
    1. This FLIP only fixes the broken use cases of 7, 13 and 15.
    2. Detailed proposals of support for case 8 and case 9 will be in a separate FLIP. 
    3. Improvement proposals of use case 14 and 16 will be done in a separate FLIP.

Public Interfaces

Existing API stability annotation cleanup

  1. Mark the following interfaces as PublicEvolving, All the rest of the classes will be marked as internal.
    • o.a.f.formats.avro.AvroDeserializationSchema
    • o.a.f.formats.avro.AvroSerializationSchema
    • o.a.f.formats.avro.AvroRowDataDeserializationSchema
    • o.a.f.formats.avro.AvroRowDataSerializationSchema
    • o.a.f.formats.avro.RegistryAvroDeserializationSchema
    • o.a.f.formats.avro.RegistryAvroSerializationSchema
    • o.a.f.formats.avro.AvroFormatOptions
    • o.a.f.formats.avro.AvroWriterFactory
    • o.a.f.formats.avro.AvroWriters
  2. The following two interfaces should probably be marked as Public for now and Deprecated once we deprecate the InputFormat / OutputFormat.
    • o.a.f.formats.avro.AvroInputFormat
    • o.a.f.formats.avro.AvroOutputFormat

New public classes

AvroSchemaUtils
/**
 * A user facing public util class to help converting {@link org.apache.avro.Schema Avro Schemas}
 * to Flink {@link DataType} and {@link Schema Table Schema}.
 */
@PublicEvolving
public class AvroSchemaUtils {

    /** private constructor for util class to prevent instantiation. */
    private AvroSchemaUtils() {}

    /**
     * Converts the given Avro schema string to a Flink {@link DataType}. This is useful
     * when user wants to convert a Table to a DataStream.
     *
     * @param avroSchemaString Avro schema string
     * @return the DataType converted from Avro schema string.
     * @see org.apache.flink.table.api.bridge.java.StreamTableEnvironment#toDataStream(Table, AbstractDataType)
     */
    public static DataType convertToDataType(String avroSchemaString) {
        return AvroSchemaConverter.convertToDataType(avroSchemaString);
    }

    /**
     * Converts the given Avro schema to a Flink {@link DataType}. This is useful
     * when user wants to convert a Table to a DataStream.
     *
     * @param schema Avro schema
     * @return the DataType converted from Avro schema string.
     * @see org.apache.flink.table.api.bridge.java.StreamTableEnvironment#toDataStream(Table, AbstractDataType)
     */
    public static DataType convertToDataType(org.apache.avro.Schema schema) {
        return AvroSchemaConverter.convertToDataType(schema);
    }

    /**
     * Converts the given Avro schema string to a Flink
     * {@link org.apache.flink.table.api.Schema Table Schema}.This is useful when user wants
     * to convert a DataStream into a Table.
     *
     * @param avroSchemaString Avro schema string.
     * @see org.apache.flink.table.api.bridge.java.StreamTableEnvironment#fromDataStream(DataStream, Schema)
     * @return the Table Schema converted from Avro schema string.
     */
    public static Schema convertToTableSchema(String avroSchemaString) {
        return AvroSchemaConverter.convertToTableSchema(avroSchemaString);
    }

    /**
     * Converts the given Avro schema to a Flink
     * {@link org.apache.flink.table.api.Schema Table Schema}.This is useful when user wants
     * to convert a DataStream into a Table.
     *
     * @param schema Avro schema.
     * @see org.apache.flink.table.api.bridge.java.StreamTableEnvironment#fromDataStream(DataStream, Schema)
     * @return the Table Schema converted from Avro schema string.
     */
    public static Schema convertToTableSchema(org.apache.avro.Schema schema) {
        return AvroSchemaConverter.convertToTableSchema(schema);
    }
}

New Avro Format Option

AvroFormatOptions
    public static final ConfigOption<String> AVRO_SCHEMA =
            ConfigOptions.key("schema")
                    .stringType()
                    .noDefaultValue()
                    .withDescription("An optional Avro reader schema string. By default, Flink table"
                            + " will generate an Avro reader schema for a table according to the defined"
                            + " Flink table schema. However, the generated schema has some limitations."
                            + " For example, it may not have the wanted default value for a field."
                            + " In that case, users can provide the intended Avro reader schema using "
                            + " this config to help generate a more accurate reader schema.");

Behavior change

Add Enum as a supported conversion class for the logical type of VARCHAR.

Currently, the Enum schema in Avro is mapped to VARCHAR type. This causes problems when users try to convert a Table into a DataStream of specific records, as the conversion will simply try to cast a String into an Enum, which does not work.

The following new method will be added to DataTypes class to create a String type backed by an enumeration class.

DataTypes
@PublicEvolving
public final class DataTypes {
	...

    /**
     * Data type of a variable-length character string representing a {@link Enum} class.
     *
     * @param clazz the {@link Enum} class.
     */

    public static DataType ENUM_STRING(Class<? extends Enum<?>> clazz) {
        return new AtomicDataType(new VarCharType(Integer.MAX_VALUE), clazz);
    }
	...
}

And a StringEnumConverter will be added to support the conversion from String to an Enum.

Proposed Changes

In addition to the public interface changes above, we propose the following changes:

  1. Map Avro Record type to StructuredType instead of RowType. 
    1. This allows the implementation class information to be carried on. This is required when converting RowData to SpecificRecords.
  2. Add support for RawType to/from RowData conversion
    1. Currently, a union schema with more than one concrete type is mapped to the logical type of RawType. However, the RawType is not handled by the AvroToRowDataConverters and RowDataToAvroConverters.

With the above changes, the user code for the above cases will be as following:

User code example
	{
		...
		final String avroSchemaString = "...";
        StreamExecutionEnvironment env = new StreamExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // Get table schema.
        Schema tableSchema = AvroSchemaUtils.convertToTableSchema(avroSchemaString);
        // Set connector options.
        Map<String, String> connectorOptions = new HashMap<>();
        connectorOptions.put(FactoryUtil.CONNECTOR.key(), "SOME_CONNECTOR");
        connectorOptions.put("format", "avro");
        connectorOptions.put("avro.schema", avroSchemaString);
        // Set additional options for the connector
        connectorOptions.put("additional.options", "...");

        // Create catalog table.
        CatalogTable table = CatalogTable.of(
                Schema.newBuilder().fromSchema(tableSchema).build(),
                "Products Catalog Table",
                Collections.emptyList(),
                connectorOptions);

        // register the catalog table
        Catalog catalog = tEnv
                .getCatalog(tEnv.getCurrentCatalog())
                .orElseThrow(() -> new RuntimeException("Catalog does not exist."));
        catalog.createTable(
                new ObjectPath(tEnv.getCurrentDatabase(), "MyTable"),
                table,
                true);

        // Read the records into t1.
        Table t1 = tEnv.sqlQuery("SELECT * FROM MyTable");

        // Convert t1 to DataStream of MyRecord.
        DataStream<MyRecord> dataStream = tEnv.toDataStream(t1, AvroSchemaUtils.convertToDataType(avroSchemaString));

        // Convert DataStream of MyRecord to t2.
        Table t2 = tEnv.fromDataStream(dataStream, AvroSchemaUtils.convertToTableSchema(avroSchemaString));
		...
	}


Compatibility, Deprecation, and Migration Plan

  • The change proposed in this FLIP is fully backwards compatible.

Test Plan

The following tests should be added to verify the change:

  1. Create an Avro schema covers the most commonly seen presentations in Avro schemas, including enum, union, default value, etc.
  2. Use the Avro schema created in step 1 to test the following cases:
    1. Use FileSource to read with BulkFormat into a Flink Table t1
    2. Convert table t1 into a DataStream of SpecificRecord.
    3. Convert a DataStream of SpecificRecord into a Table.

Rejected Alternatives

N/A