Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/m67s4qfrh660lktpq7yqf9docvvf5o9l
Vote threadhttps://lists.apache.org/thread/2xmcxs67xxzwool554fglrnklyvw348h
JIRA

Unable to render Jira issues macro, execution error.

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink supports different serializers such as Java basic types, Pojo , and Kryo . When developing Flink jobs, users can register custom data types and serializers through StreamExecutionEnvironment  and ExecutionConfig , and can also configure Pojo  and Kryo  serialized data classes through the three configuration options as pipeline.default.kryo-serializers , pipeline.registered-kryo-types , and pipeline.registered-pojo-types. Flink jobs will look up and create corresponding serializers for each data type based on the data types and registered serializers, and there are mainly the following issues:

1) Users register data types and serializers through hard codes, they need to modify the coeds when upgrading job version which cannot be simply achieved through configuration. As mentioned in the document [1], we'd like to unify job relevant settings into the configuration file.

2) Currently serializer configuration capabilities are not equal to API. Three options are provided to configure data types and serializers, but custom serializers are not supported. At the same time, there are priority issues among the three configuration items. If users configure a data type in all these options, it will cause conflicts.

3) Kryo serializer compatibility issues. Users can create custom data types in Flink jobs, Flink will choose the corresponding serializer. If no serializer is found, the Kryo serializer will be used. Using the Kryo serializer has the following issues

a) Different Kryo versions have compatibility issues, resulting in incompatibility between the old and new versions of jobs when Flink upgrades the JVM or Kryo version.

b) The issues of schema evolution in state, such as adding or removing columns for the state, and the new job cannot recover from the original state when it uses Kryo serializer.

Flink has options to turn on/off Kryo  serializer and configure Kryo / Pojo serializers, this FLIP wound like to improve serialization configuration and usage in Flink based on that.

1) Configuration for customized serializer. Flink supports @TypeInfo annotation and TypeInfoFactory to create customized serializers for user defined data refer to doc [2]. This FLIP proposes to configure customized serializers in configuration and users can update them for jobs without modifying any codes.

2) Provide uniform configuration for Kryo , Pojo and customized serializer. Flink now configures serializers for Kryo and Pojo in different options, they can be combined with customized serializers in one option which will simplify the usage for users and fix the priority issues for the same data type in different options.

3) Add more built-in serializers in Flink such as List , Map and Set. Flink has many serializers for basic data types, but when users use composite data type such as List<String> or Map<String, Integer> , Flink can only use Kryo or use defined serializers for them. More built-in serializers should be added in Flink for them to improve the performance.

Public Interface

The following methods in ExecutionConfig will be deprecated and be removed in flink-2.0

Method

Annotation

org.apache.flink.api.common.ExecutionConfig#registerKryoType(Class<?> type)

@Public

org.apache.flink.api.common.ExecutionConfig#registerPojoType(Class<?> type)

org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, T serializer)

org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer( Class<?> type, T serializer)

org.apache.flink.api.common.ExecutionConfig#getRegisteredKryoTypes()

org.apache.flink.api.common.ExecutionConfig#getRegisteredPojoTypes()

org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializerClasses()

org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializers()

org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializerClasses()

org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializers()

org.apache.flink.api.common.ExecutionConfig#enableForceAvro()

org.apache.flink.api.common.ExecutionConfig#enableForceKryo()

org.apache.flink.api.common.ExecutionConfig#enableGenericTypes()

org.apache.flink.api.common.ExecutionConfig#disableForceAvro()

org.apache.flink.api.common.ExecutionConfig#disableForceKryo()

org.apache.flink.api.common.ExecutionConfig#disableGenericTypes()

org.apache.flink.api.common.ExecutionConfig#hasGenericTypesDisabled()

org.apache.flink.api.common.ExecutionConfig#isForceAvroEnabled()

org.apache.flink.api.common.ExecutionConfig#isForceKryoEnabled()

The following methods in StreamExecutionEnvironment will be deprecated and removed in flink-1.20

Method

Annotation

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerType(Class<?> type)

@Public

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, T serializer)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, T serializer)

The following method in TypeInformation and it's sub-classes will be deprecated and removed in next release.

Method

Annotation

org.apache.flink.api.common.typeinfo#createSerializer(ExecutionConfig config)

@PublicEvolving

Users can configure pipeline.force-avro , pipeline.force-kryo and pipeline.generic-types in job after enableForceAvro , enableForceKryo and enableGenericTypes are deprecated and removed as follows.

Configuration configuraiton = new Configuration();
configuration.set(PipelineOptions.GENERIC_TYPES, false);
configuration.set(PipelineOptions.FORCE_KRYO, true);
configuration.set(PipelineOptions.FORCE_AVRO, true);

StreamExecutionEnvironment env = new StreamExecutionEnvironment(configuration);
// Build flink job with env.

There are options pipeline.default.kryo-serializers , pipeline.registered-kryo-types and pipeline.registered-pojo-types for Kryo and Pojo serializers in Flink, they will be deprecated and removed in flink-2.0. This FLIP will introduce a new option pipeline.serialization-config for all Kryo , Pojo and customized serializers.

@PublicEvolving
public class PipelineOptions {

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> KRYO_DEFAULT_SERIALIZERS =
        key("pipeline.default-kryo-serializers")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of pairs of class names and Kryo serializers class names to be used"
                                                + " as Kryo default serializers")
                                .linebreak()
                                .linebreak()
                                .text("Example:")
                                .linebreak()
                                .add(
                                        TextElement.code(
                                                "class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;"
                                                        + " class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2"))
                                .build());

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> KRYO_REGISTERED_CLASSES =
        key("pipeline.registered-kryo-types")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of types to be registered with the serialization stack. If the type"
                                                + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the"
                                                + " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
                                                + " sure that only tags are written.")
                                .build());

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> POJO_REGISTERED_CLASSES =
        key("pipeline.registered-pojo-types")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of types to be registered with the serialization stack. If the type"
                                                + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the"
                                                + " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
                                                + " sure that only tags are written.")
                                .build());
 
    public static final ConfigOption<List<String>> SERIALIZATION_CONFIG =
        key("pipeline.serialization-config")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "List of pairs of class names and serializer config to be used. There is `type` fiend in"
                                                 + " the serializer config with a value 'pojo', 'kryo' or 'typeinfo' and each `type` has its own"
                                                 + " configuration.")
                                .linebreak()
                                .linebreak()
                                .text("Example:")
                                .linebreak()
                                .add(
                                        TextElement.code(
                                                "[org.example.ExampleClass1: {type: pojo},"
                                                + " org.example.ExampleClass2: {type: kryo},"
                                                + " org.example.ExampleClass3: {type: kryo, kryo-type: default, class: org.example.Class3KryoSerializer},"
                                                + " org.example.ExampleClass4: {type: kryo, kryo-type: registered, class: org.example.Class4KryoSerializer},"
                                                + " org.example.ExampleClass5: {type: typeinfo, class: org.example.Class5TypeInfoFactory}]"))
                                .build());
    
}

The value for pipeline.serialization-config is a key-value pair, there will be data class name for the key and the serializers of the data for the value. The configuration for serialization is as follows.

1) type : the serializer type which could be pojo , kryo or typeinfo . If the value of type is pojo or kryo without kryo-type, it means the data type will use pojo or kryo serialzier directly.

2) kryo-type : the type for kryo serializer which could be default or registered . The kryo serializer will use the serializer for the data type as default serializers when the kryo-type is default , and register the data type and its serializer to kryo serializer when the kryo-type is registered . When the field kryo-type exists, there must be a field class to configure the specific serializer class name.

3) class: it is the serializer class name for kryo  or typeinfo . For kryo serializer, it should be a subclass of com.esotericsoftware.kryo.Serializer , and for typeinfo it should be a subclass of org.apache.flink.api.common.typeinfo.TypeInfoFactory .

Currently when Flink initializes the Kryo serializer, it automatically detects whether flink-avro  is in job classpath: if it does not exist, an avro dummy serializer is created and registered with Kryo; otherwise, the Avro serializer is loaded from flink-avro and registered with Kryo. We introduce a new option pipeline.force-kryo-avro in flink and its default value is false , Flink  will detect the flink-avro  module and register avro serializer with Kryo only when the option is configured as true .

@PublicEvolving
public class PipelineOptions {
    public static final ConfigOption<Boolean> FORCE_KRYO_AVRO =
        key("pipeline.force-kryo-avro")
                .booleanType()
                .defaultValue(false)
                .withDescription(Description.builder()
                    .text(
                            "Force register avro classes in kryo serializer.")
                    .linebreak()
                    .linebreak()
                    .text(
                            "Important: Make sure to include the %s module.",
                            code("flink-avro"))
                    .build());
}

This FLIP introduces SerializerConfig for serializers in flink to decouple the serializer from ExecutionConfig , it can be created from Configuration  and provides methods for serializers. The TypeExtractor and TypeInformation will use SerializerConfig instead of ExecutionConfig .

/** 
 * A config to define the behavior for serializer in flink job, it manages the registered type and serializers.
 * The config is created from job configuration and used by flink to create serializer for data type.
 **/
@PublicEvolving
public final class SerializerConfig implements Serializable {

    /** Create serializer config instance from configuration. */
    public SerializerConfig(Configuration configuration) {
        ...;
    }
    
    /** Returns the registered types with their Kryo Serializer classes. */
    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
            getRegisteredTypesWithKryoSerializerClasses();
    
    /** Returns the registered Kryo Serializer classes. */
    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
            getDefaultKryoSerializerClasses();
    
    /** Returns the registered Kryo types. */
    public LinkedHashSet<Class<?>> getRegisteredKryoTypes();
    
    /** Returns the registered POJO types. */
    public LinkedHashSet<Class<?>> getRegisteredPojoTypes();
    
    /** Returns the registered type info factories. */
    public Map<Class<?>, Class<? extends TypeInfoFactory<?>>> getRegisteredTypeInfoFactories();
    
    /**
     * Checks whether generic types are supported. Generic types are types that go through Kryo
     * during serialization.
     *
     * <p>Generic types are enabled by default.
     *
     * @see #enableGenericTypes()
     * @see #disableGenericTypes()
     */
    public boolean hasGenericTypesDisabled();
    
    /** Returns whether the Apache Avro is the serializer for POJOs. */
    public boolean isForceAvroEnabled();
    
    /** Returns whether kryo is the serialzier for POJOs. */
    public boolean isForceKryoEnabled();
}

The relevant method in TypeInformation 

Method

Annotation

org.apache.flink.api.common.typeinfo#createSerializer(SerializerConfig config)

@PublicEvolving

Currently, option pipeline.generic-types in Flink is used to enable/disable Kryo serialzier, its default value is true which means Flink will use Kryo as fallback serializer. This causes Kryo to be used by default to serialize some user-defined data without the user being aware of it, which can lead to incompatibility between the new and old versions of the job state during version upgrades. To avoid this issue, in Flink-2.0 , the default value of pipeline.generic-types can be changed to false to prevent the usage of the Kryo serializer without the user's knowledge.

Proposed Changes

When users create a Flink job, TypeExtractor will choose an appropriate serializer for data type and create TypeInformation . After supporting the above data types and serializer configurations, it follows the following priority when Flink needs to create a serializer and TypeInformation  for a data type.

The data types and serializers in the configuration file have the highest priority. For data types that are not in the configuration, Flink traverses and creates serializers for TypeInfo  annotation, basic data, Tuple, composited data, Pojo and generic data types in turn. Besides current built-in serializers, this FLIP will add more serializers for composited data types List , Map , Set and so on. Flink has TypeInformation for these data types like MapTypeInfo , ListTypeInfo  and MultisetTypeInfo , however, TypeExtractor cannot recognize the composited data types and use these serializers. We would like to support them in this FLIP.

Flink uses AvroUtils  to manage Avro  serializers. When flink-avro  is in the job classpath, it creates the org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils  utility class. Otherwise, it uses the built-in org.apache.flink.api.java.typeutils.AvroUtils$DefaultAvroUtils . AvroUtils  has following methods which are used in different places.

1) void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type)

The Serializers  calls this method to register Avro  data types with the kryo  serializer through ExecutionConfig#registerTypeWithKryoSerializer  and ExecutionConfig#addDefaultKryoSerializer. The uppermost entry point for this method is in the ExecutionEnvironment  which is used by DataSet  API to create jobs. Since DataSet  will no longer be used and will be removed in the future, this method in AvroUtils  can be marked as Deprecated  and removed in flink-2.0 .

2) void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations)

KryoSerializer registers serializer for GenericData.Array to kryo registration as follows

a) If flink-avro exists in job classpath, it automatically creates AvroKryoSerializerUtils and registers GenericData.Array and its serializer to kryo serializer.

b) If flink-avro  is not in the job classpath, a default DefaultAvroUtils  is created and a dummy serializer is registered. As mentioned above, we can add a pipeline.force-kryo-avro  option with a default value of false. Only when the option is set to true, flink will lookup flink-avro  module and register the Avro serializer in KryoSerializer . 

3) TypeSerializer<T> createAvroSerializer(Class<T> type)

The default value of the Flink job option pipeline.force-avro  is false. When users set it to true, they need to add flink-avro  to the job classpath. Then, Pojo will use Avro serializers, and PojoTypeInfo  will create an AvroSerializer  instance for the data type. This part of the processing can remain unchanged.

4) TypeInformation<T> createAvroTypeInfo(Class<T> type)

When a Flink job creates a serializer for a data type which is a subclass of org.apache.avro.specific.SpecificRecordBase, it needs to ensure that flink-avro  is in the job classpath. This part of the processing can remain unchanged.

Compatibility, Deprecation, and Migration Plan

1) Usage Of Fallback Kryo Serializer

Since the Kryo  serializer has issues such as schema evolution and version upgrades, in Flink-2.0  we will default to disabling the Kryo serializer. If users do not turn on it manually, flink job will throw an exception to remind that they need to set pipeline.generic-types to true, or develop a custom serializer and add it in configuration.

2) Existing Serializer Registration and Configuration

The methods about serialization in StreamExecutionEnvironment and ExecutionConfig will be deprecated and removed in flink-2.0 . The newly added SerializerConfig is compatible with the current interface of ExecutionConfig . For currently flink jobs that set data types and serializers, they can use following methods to complete the migration for compatibility with flink-2.0 .

a) StreamExecutionEnvironment#registerType(Class<?> type)、ExecutionConfig#registerKryoType(Class<?> type)和ExecutionConfig#registerPojoType(Class<?> type)

Configure the specified data types to use the Pojo and Kryo serializers as follows:

pipeline.serialization-config: org.example.PojoDataClass: {type: pojo}, org.example.KryoDataClass: {type: kryo}

b) StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)、ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

Configure the specified data types to use the Kryo  serializer class as follows:

pipeline.serialization-config: org.example.KryoDataClass: {type: kryo, kryo-type: registered, class: org.example.KryoSerializerClass}

c) ExecutionConfig#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

Configure the specified data types to use default Kryo serializer classes as follows:

pipeline.serialization-config: org.example.KryoDataClass: {type: kryo, kryo-type: default, class: org.example.KryoSerializerClass}

d) For data types are annotated with @TypeInfo use TypeInfoFactory :

pipeline.serialization-config: org.example.TypeDataClass: {type: typeinfo, class: org.example.TypeDataInfoFactory}

e) StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, T serializer)和ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, T serializer)

We do not want to support these methods in flink-2.0 . Users can construct the specified serializer class and specify the Kryo Serializer class as described above.

f) For the configuration of specifying data types and serializer classes using pipeline.default.kryo-serializers , pipeline.registered-kryo-types , and pipeline.registered-pojo-types , users can reconfigure them according to the new format.

Test Plan

UT & E2E

Rejected Alternatives

NONE


[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278464992
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory