Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread
Vote threadhere (<- link to 
https://lists.apache.org/
list.html?dev@flink.apache.org
thread/m67s4qfrh660lktpq7yqf9docvvf5o9l
Vote threadhttps://lists.apache.org/thread/2xmcxs67xxzwool554fglrnklyvw348h
JIRA

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-34037

)JIRA

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink supports different serializers such as Java basic types, Pojo , and Kryo . When developing Flink jobs, users can register custom data types and serializers through StreamExecutionEnvironment  and ExecutionConfig , and can also configure Pojo  and Kryo  serialized data classes through the three configuration options as pipeline.default.kryo-serializers , pipeline.registered-kryo-types , and pipeline.registered-pojo-types. Flink jobs will look up and create corresponding serializers for each data type based on the data types and registered serializers, and there are mainly the following problemsissues:

1) Users register data types and serializers through hard codes, they need to modify the coeds when upgrading job version which cannot be simply achieved through configuration. As mentioned in the document [1], we'd like to unify job relevant settings into the configuration file.

2) Currently serializer configuration capabilities are not equal to API. The three configuration Three options are provided to configure data types and corresponding serializers, but custom serializers are not supported. At the same time, there is a are priority issue issues among the three configuration items. If users configure the same a data type in all these configuration itemsoptions, it will cause semantic conflicts in serialization.

3) Kryo serializer compatibility issues. After users use Users can create custom data types in Flink jobs, Flink will choose the corresponding serializer. If no serializer is found, the Kryo serializer will be used. Using the Kryo serializer has the following problemsissues

    a) Different Kryo versions of Kryo may have compatibility issues, resulting in incompatibility between the old and new versions of jobs when Flink upgrades the JVM or Kryo version.

    b) The issues of schema evolution in state, such as adding or removing columns from for the state data, and the new job cannot recover from the original state when it uses Kryo serializer.

...

1) Configuration for customized serializer. Flink supports @TypeInfo annotation and TypeInfoFactory to create customized serializers for user defined data refer to doc [2]. This FLIP proposes to configure customized serializers in configuration and users can update them for jobs without modifying any codes.

2) Provide uniform configuration for Kryo , Pojo and customized serializer. Flink now configure configures serializers for Kryo and Pojo in different options, they can be combined with customized serializers in one option which will simplify the usage for users and fix the priority issues for the same data type in different options.

3) Add more built-in serializers in Flink such as List , Map and Set. Flink has many serializers for basic data typetypes, but when users use composite data type such as List<String> or Map<String, Integer> , Flink can only use Kryo or use defined serializers for them. More built-in serializers can should be added in Flink for them to improve the performance.

...

The following methods in ExecutionConfig will be deprecated and be removed in flink-2.0

Method

Annotation

org.apache.flink.api.common.ExecutionConfig#registerKryoType(Class<?> type)

@Public

org.apache.flink.api.common.ExecutionConfig#registerPojoType(Class<?> type)

org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, T serializer)

org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer( Class<?> type, T serializer)

org.apache.flink.api.common.ExecutionConfig#getRegisteredKryoTypes()

org.apache.flink.api.common.ExecutionConfig#getRegisteredPojoTypes()

org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializerClasses()

org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializers()

org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializerClasses()

org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializers()

org.apache.flink.api.common.ExecutionConfig#enableForceAvro()

org.apache.flink.api.common.ExecutionConfig#enableForceKryo()

org.apache.flink.api.common.ExecutionConfig#enableGenericTypes

()

org.apache.flink.api.common.ExecutionConfig#setForceAvro(boolean forceAvro)

org.apache.flink.api.common.ExecutionConfig#setForceKryo

(

boolean forceKryo

)

org.apache.flink.api.common.ExecutionConfig#disableForceAvro()

org.apache.flink.api.common.ExecutionConfig#disableForceKryo()

org.apache.flink.api.common.ExecutionConfig#disableGenericTypes()

org.apache.flink.api.common.ExecutionConfig#hasGenericTypesDisabled()

org.apache.flink.api.common.ExecutionConfig#isForceAvroEnabled()

org.apache.flink.api.common.ExecutionConfig#isForceKryoEnabled()

The following methods in StreamExecutionEnvironment will be deprecated and removed in flink-21.020

Method

Annotation

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerType(Class<?> type)

@Public

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, T serializer)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, T serializer)

The following method in TypeInformation and it's sub-classes will be deprecated and removed in flink-2next release.0

Method

Annotation

org.apache.flink.api.common.typeinfo#createSerializer(ExecutionConfig config)

@PublicEvolving

Users can configure pipeline.force-avro , pipeline.force-kryo and pipeline.generic-types in configuration job after enableForceAvro , enableForceKryo and enableGenericTypes are deprecated and removed as follows.

...

Code Block
@PublicEvolving
public class PipelineOptions {

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> KRYO_DEFAULT_SERIALIZERS =
        key("pipeline.default-kryo-serializers")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of pairs of class names and Kryo serializers class names to be used"
                                                + " as Kryo default serializers")
                                .linebreak()
                                .linebreak()
                                .text("Example:")
                                .linebreak()
                                .add(
                                        TextElement.code(
                                                "class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;"
                                                        + " class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2"))
                                .build());

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> KRYO_REGISTERED_CLASSES =
        key("pipeline.registered-kryo-types")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.asListbuilder()
                .noDefaultValue()
                .withDescriptiontext(
                             Description.builder()
           "Semicolon separated list of types to be registered with the serialization stack. If the type"
         .text(
                                       + "Semicolon separated list of types to be is eventually serialized as a POJO, then the type is registered with the serializationPOJO stackserializer. If the type"
                                                + " is eventuallytype ends up being serialized as awith POJOKryo, then theit typewill isbe registered withat theKryo POJO serializer. If theto make"
                                                + " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
 sure that only tags are written.")
                                .build());

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> POJO_REGISTERED_CLASSES =
        + " sure that only tags are written.key("pipeline.registered-pojo-types")
                .stringType()
                .buildasList());

       /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
.noDefaultValue()
       @Deprecated
    public static final ConfigOption<List<String>> POJO_REGISTERED_CLASSES = .withDescription(
        key("pipeline.registered-pojo-types")
                Description.stringTypebuilder()
                .asList()
                .noDefaultValuetext()
                .withDescription(
                        Description.builder()
              "Semicolon separated list of types to be registered with the serialization stack. If the type"
                  .text(
                              + " is      eventually serialized "Semicolonas separateda listPOJO, ofthen typesthe totype beis registered with the serializationPOJO stackserializer. If the type"
                                                + " type ends isup eventuallybeing serialized aswith a POJOKryo, then theit typewill isbe registered withat theKryo POJO serializer. If theto make"
                                                + " sure that only tags are written." type ends up being serialized with Kryo, then it will be registered at Kryo to make"
)
                                .build());
 
    public static final ConfigOption<List<String>> SERIALIZATION_CONFIG =
        key("pipeline.serialization-config")
                .stringType()
         + " sure that only tags are written."asList()
                .noDefaultValue()
                .buildwithDescription());
 
          public static final ConfigOption<List<String>> SERIALIZATION_CONFIG =
        key("pipeline.serialization-config"Description.builder()
                .stringType()
                .asListtext()
                .noDefaultValue()
                .withDescription(
        "List of pairs of class names and serializer config to be used. There is `type`  Description.builder()
fiend in"
                                         .text(
        + " the serializer config with a value 'pojo', 'kryo' or 'typeinfo' and each `type` has its own"
                "Semicolon separated list of pairs of class names and serializer config to be used. There is `type` fiend in the serializer config with a value 'pojo', 'kryo' or 'typeinfo' and each `type` has its+ own" configuration.")
                                .linebreak()
                                .linebreak()
                                .text("Example:")
                                .linebreak()
                                .add(
                                        TextElement.code(
                                                "[org.example.ExampleClass1: {type: pojo};,"
                                                + " org.example.ExampleClass2: {type: kryo};,"
                                                + " org.example.ExampleClass3: {type: kryo, kryo-type: default, class: org.example.Class3KryoSerializer};,"
                                                + " org.example.ExampleClass4: {type: kryo, kryo-type: registered, class: org.example.Class4KryoSerializer};,"
                                                + " org.example.ExampleClass5: {type: typeinfo, class: org.example.Class5TypeInfoFactory}]"))
                                .build());
    
}

The value for pipeline.serialization-config is a key-value pair, there will be data class name for the key and the serializers of the data for the value. The configuration for serialization is as follows.

1) type : the serializer type which may could be pojo , kryo or typeinfo . If the value of type is pojo or kryo without kryo-type field, it means the data type will use pojo or kryo serialzier directly.

2) kryo-type : the type for kryo serializer which may could be default or registered . The kryo serializer will use the serializer for the data type as default serializers when the kryo-type is default , and it will register the data type and its serializer to kryo serializer when the kryo-type is registered . When the field kryo-type is exist exists, there must be the a field class to configure the specific serializer class name.

3) class: it is a the serializer class name for kryo  or typeinfo . For kryo serializer, it should be a subclass of com.esotericsoftware.kryo.Serializer , and for typeinfo it should be a subclass of org.apache.flink.api.common.typeinfo.TypeInfoFactory .

Currently when Flink initializes the Kryo serializer, it automatically detects whether flink-avro  is in job classpath: if it does not exist, an avro dummy serializer is created and registered with Kryo; otherwise, the Avro serializer is loaded from flink-avro and registered with Kryo. We introduce a new option pipeline.force-kryo-avro in flink and its default value is false , and Flink  detects will detect the flink-avro  module and register avro serializer with Kryo only when the option is configured as true .

Code Block
@PublicEvolving
public class PipelineOptions {
    public static final ConfigOption<Boolean> FORCE_KRYO_AVRO =
        key("pipeline.force-kryo-avro")
                .booleanType()
                .defaultValue(false)
                .withDescription(Description.builder()
                    .text(
                            "Force register avro classes in kryo serializer.")
                    .linebreak()
                    .linebreak()
                    .text(
                            "Important: Make sure to include the %s module.",
                            code("flink-avro"))
                    .build());
}

...

This FLIP introduces SerializerConfig for serializers in flink to decouple the serializer from ExecutionConfig , it can be created from Configuration  and provides methods for serializers. The TypeExtractor and TypeInformation will use SerializerConfig instead of ExecutionConfig .

Code Block
/** 
 * A config to define the behavior for serializer in flink job, it manages the registered type and serializers.
 * The config is created from job configuration and used by flink to create serializer for data type.
 **/
@PublicEvolving
public final class SerializerConfig implements Serializable {
    private SerializerConfig() { }
    
    /** Create serializer config instance from configuration. */
    public static SerializerConfig create(Configuration configuration) {
        ...;
    }
    
    /** Returns the registered types with their Kryo Serializer classes. */
    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
            getRegisteredTypesWithKryoSerializerClasses();
    
    /** Returns the registered Kryo Serializer classes. */
    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
            getDefaultKryoSerializerClasses();
    
    /** Returns the registered Kryo types. */
    public LinkedHashSet<Class<?>> getKryoTypesgetRegisteredKryoTypes();
    
    /** Returns the registered POJO types. */
    public LinkedHashSet<Class<?>> getPojoTypesgetRegisteredPojoTypes();
    
    /** Returns the registered type info factories. */
    public LinkedHashMap<Class<Map<Class<?>, Class<TypeInfoFactory<Class<? extends TypeInfoFactory<?>>> getTypeInfoFactoriesgetRegisteredTypeInfoFactories();
    
    /**
     * Checks whether generic types are supported. Generic types are types that go through Kryo
     * during serialization.
     *
     * <p>Generic types are enabled by default.
     *
     * @see #enableGenericTypes()
     * @see #disableGenericTypes()
     */
    public boolean hasGenericTypesDisabled();
    
    /** Returns whether the Apache Avro is the serializer for POJOs. */
    public boolean isForceAvroEnabled();
    
    /** Returns whether kryo is the serialzier for POJOs. */
    public boolean isForceKryoEnabled();
}

The relevant method in TypeInformation 

Method

Annotation

org.apache.flink.api.common.typeinfo#createSerializer(SerializerConfig config)

@PublicEvolving

Currently, option pipeline.generic-types in Flink is used to enable/disable Kryo serialzier, its default value is true which means Flink will use Kryo as fallback serializer. This causes Kryo to be used by default to serialize some user-defined data types without the user being aware of it, which can lead to incompatibility between the new and old versions of the job state during version upgrades. To avoid this issue, in Flink-2.0 , the default value of pipeline.generic-types can be changed to false to prevent the useage usage of the Kryo serializer without the user's knowledge.

...

When user users create a Flink job, TypeExtractor will choose an appropriate serializer for data type and create TypeInformation . After supporting the above data types and serializer configurations, it follows the following priority when Flink needs to create a serializer and TypeInformation  for a data type, it follows the following priority.

draw.io Diagram
bordertrue
diagramNameserializer priority
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth321
revision1

The data types and serializers in the configuration file have the highest priority. For data types that are not in the configuration, Flink traverses and creates serializers for TypeInfo  annotation, basic data, Tuple, composite composited data, Pojo and generic data types in turn. Besides current built-in serializers, this FLIP will add more serializers for composited data types List , Map , Set and so on. Flink has TypeInformation for these data types like MapTypeInfo , ListTypeInfo  and MultisetTypeInfo , however, TypeExtractor cannot recognize the composited data types and use these serializers. We would like to support them in this FLIP.

...

The Serializers  calls this method to register Avro  data types with the kryo  serializer through the ExecutionConfig#registerTypeWithKryoSerializer  and ExecutionConfig#addDefaultKryoSerializer methods. The uppermost entry point for this method is in the ExecutionEnvironment, mainly for use by the   which is used by DataSet  API to create jobs. Since DataSet  will no longer be used and will be removed in the future, this method in AvroUtils  can be marked as Deprecated  and removed in flink-2.0 .

...

KryoSerializer registers serializer for GenericData.Array in avro to kryo registration as follows

a) If flink-avro exists in job classpath, it automatically creates AvroKryoSerializerUtils and registers GenericData.Array and its serializer to kryo serializer.

b) When If flink-avro  is not in the job classpath, a default DefaultAvroUtils  is created and a dummy serializer is registered. As mentioned above, we can add a pipeline.force-kryo-avro  option with a default value of false. Only when the option is set to true, flink will lookup flink-avro  module and register the Avro serializer in KryoSerializer . 

...

The default value of the Flink job option pipeline.force-avro  is false. When users set it to true, they need to add flink-avro  to the job classpath. Then, Pojo  data will  will use Avro  serializers, and PojoTypeInfo  will create an AvroSerializer  instance for the data type. This part of the processing can remain unchanged.

...