You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRA


Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink supports different serializers such as Java basic types, Pojo , and Kryo . When developing Flink jobs, users can register custom data types and serializers through StreamExecutionEnvironment  and ExecutionConfig , and can also configure Pojo  and Kryo  serialized data classes through the three configuration options as pipeline.default.kryo-serializers , pipeline.registered-kryo-types , and pipeline.registered-pojo-types. Flink jobs will look up and create corresponding serializers for each data type based on the data types and registered serializers, and there are mainly the following problems:

1) Users register data types and serializers through hard codes, they need to modify the coeds when upgrading job version which cannot be simply achieved through configuration. As mentioned in the document [1], we'd like to unify job relevant settings into the configuration file.

2) Currently serializer configuration capabilities are not equal to API. The three configuration options are provided to configure data types and corresponding serializers, but custom serializers are not supported. At the same time, there is a priority issue among the three configuration items. If users configure the same data type in these configuration items, it will cause semantic conflicts in serialization.

3) Kryo serializer compatibility issues. After users use custom data types in Flink jobs, Flink will choose the corresponding serializer. If no serializer is found, the Kryo serializer will be used. Using the Kryo serializer has the following problems

    a) Different versions of Kryo may have compatibility issues, resulting in incompatibility between the old and new versions of jobs when Flink upgrades the JVM or Kryo version.

    b) The issues of schema evolution in state, such as adding or removing columns from the state data, and the new job cannot recover from the original state when it uses Kryo serializer.

Flink has options to turn on/off Kryo  serializer and configure Kryo / Pojo serializers, this FLIP wound like to improve serialization configuration and usage in Flink based on that.

1) Configuration for customized serializer. Flink supports @TypeInfo annotation and TypeInfoFactory to create customized for user defined data refer to doc [2]. This FLIP proposes to configure customized serializers in configuration and users can update them for jobs without modifying any codes.

2) Provide uniform configuration for Kryo , Pojo and customized serializer. Flink now configure serializers for Kryo and Pojo in different options, they can be combined with customized serializers in one option which will simplify the usage for users and fix the priority issues for the same data type in different options.

3) Add more built-in serializers in Flink such as List , Map and Set. Flink has many serializers for basic data type, but when users use composite data type such as List<String> or Map<String, Integer> , Flink can only use Kryo or use defined serializers for them. More built-in serializers can be added in Flink for them to improve the performance.

Public Interface

The following methods in ExecutionConfig will be deprecated and be removed in flink-2.0

Method

Annotation

org.apache.flink.api.common.ExecutionConfig#registerKryoType(Class<?> type)

@Public

org.apache.flink.api.common.ExecutionConfig#registerPojoType(Class<?> type)

org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, T serializer)

org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer( Class<?> type, T serializer)

org.apache.flink.api.common.ExecutionConfig#getRegisteredKryoTypes()

org.apache.flink.api.common.ExecutionConfig#getRegisteredPojoTypes()

org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializerClasses()

org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializers()

org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializerClasses()

org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializers()

org.apache.flink.api.common.ExecutionConfig#enableForceAvro()

org.apache.flink.api.common.ExecutionConfig#enableForceKryo()

org.apache.flink.api.common.ExecutionConfig#enableGenericTypes()

org.apache.flink.api.common.ExecutionConfig#setForceAvro(boolean forceAvro)

org.apache.flink.api.common.ExecutionConfig#setForceKryo(boolean forceKryo)

org.apache.flink.api.common.ExecutionConfig#disableForceAvro()

org.apache.flink.api.common.ExecutionConfig#disableForceKryo()

org.apache.flink.api.common.ExecutionConfig#disableGenericTypes()

org.apache.flink.api.common.ExecutionConfig#hasGenericTypesDisabled()

org.apache.flink.api.common.ExecutionConfig#isForceAvroEnabled()

org.apache.flink.api.common.ExecutionConfig#isForceKryoEnabled()

The following methods in StreamExecutionEnvironment will be deprecated and removed in flink-2.0

Method

Annotation

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerType(Class<?> type)

@Public

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, T serializer)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, T serializer)

The following method in TypeInformation will be deprecated and removed in flink-2.0

Method

Annotation

org.apache.flink.api.common.typeinfo#createSerializer(ExecutionConfig config)

@PublicEvolving

Users can configure pipeline.force-avro , pipeline.force-kryo and pipeline.generic-types in configuration after enableForceAvro , enableForceKryo and enableGenericTypes are deprecated and removed as follows.

Configuration configuraiton = new Configuration();
configuration.set(PipelineOptions.GENERIC_TYPES, false);
configuration.set(PipelineOptions.FORCE_KRYO, true);
configuration.set(PipelineOptions.FORCE_AVRO, true);

StreamExecutionEnvironment env = new StreamExecutionEnvironment(configuration);
// Build flink job with env.

There are options pipeline.default.kryo-serializers , pipeline.registered-kryo-types and pipeline.registered-pojo-types for Kryo and Pojo serializers in Flink, they will be deprecated and removed in flink-2.0. This FLIP will introduce a new option pipeline.serialization-config for all Kryo , Pojo and customized serializers.

@PublicEvolving
public class PipelineOptions {

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> KRYO_DEFAULT_SERIALIZERS =
        key("pipeline.default-kryo-serializers")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of pairs of class names and Kryo serializers class names to be used"
                                                + " as Kryo default serializers")
                                .linebreak()
                                .linebreak()
                                .text("Example:")
                                .linebreak()
                                .add(
                                        TextElement.code(
                                                "class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;"
                                                        + " class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2"))
                                .build());

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> KRYO_REGISTERED_CLASSES =
        key("pipeline.registered-kryo-types")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of types to be registered with the serialization stack. If the type"
                                                + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the"
                                                + " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
                                                + " sure that only tags are written.")
                                .build());

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> POJO_REGISTERED_CLASSES =
        key("pipeline.registered-pojo-types")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of types to be registered with the serialization stack. If the type"
                                                + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the"
                                                + " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
                                                + " sure that only tags are written.")
                                .build());
 
    public static final ConfigOption<List<String>> SERIALIZATION_CONFIG =
        key("pipeline.serialization-config")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of pairs of class names and serializer config to be used. There is `type` fiend in"
                                                 + " the serializer config with a value 'pojo', 'kryo' or 'typeinfo' and each `type` has its own"
                                                 + " configuration.")
                                .linebreak()
                                .linebreak()
                                .text("Example:")
                                .linebreak()
                                .add(
                                        TextElement.code(
                                                "org.example.ExampleClass1: {type: pojo};"
                                                + " org.example.ExampleClass2: {type: kryo};"
                                                + " org.example.ExampleClass3: {type: kryo, kryo-type: default, class: org.example.Class3KryoSerializer};"
                                                + " org.example.ExampleClass4: {type: kryo, kryo-type: registered, class: org.example.Class4KryoSerializer};"
                                                + " org.example.ExampleClass5: {type: typeinfo, class: org.example.Class5TypeInfoFactory}"))
                                .build());
    
}

The value for pipeline.serialization-config is a key-value pair, there will be data class name for the key and the serializers of the data for the value. The configuration for serialization is as follows.

1) type : the serializer type which may be pojo , kryo or typeinfo . If the value of type is pojo or kryo without kryo-type field, it means the data type will use pojo or kryo serialzier directly.

2) kryo-type : the type for kryo serializer which may be default or registered . The kryo serializer will use the serializer for the data type as default serializers when the kryo-type is default , and it will register the data type and its serializer to kryo serializer when the kryo-type is registered . When the field kryo-type is exist, there must be the field class to configure the specific serializer class name.

3) class: it is a serializer class name for kryo  or typeinfo . For kryo serializer, it should be a subclass of com.esotericsoftware.kryo.Serializer , and for typeinfo it should be a subclass of org.apache.flink.api.common.typeinfo.TypeInfoFactory .

Currently when Flink initializes the Kryo serializer, it automatically detects whether flink-avro  is in job classpath: if it does not exist, an avro dummy serializer is created and registered with Kryo; otherwise, the Avro serializer is loaded from flink-avro and registered with Kryo. We introduce a new option pipeline.force-kryo-avro in flink and its default value is false , and Flink  detects the flink-avro  module and register avro serializer with Kryo only when the option is configured true .

@PublicEvolving
public class PipelineOptions {
    public static final ConfigOption<Boolean> FORCE_KRYO_AVRO =
        key("pipeline.force-kryo-avro")
                .booleanType()
                .defaultValue(false)
                .withDescription(Description.builder()
                    .text(
                            "Force register avro classes in kryo serializer.")
                    .linebreak()
                    .linebreak()
                    .text(
                            "Important: Make sure to include the %s module.",
                            code("flink-avro"))
                    .build());
}

This FLIP introduces SerializerConfig for serializers in flink to decouple the serializer from ExecutionConfig , it can be created from Configuration  and provides methods for serializers. The TypeExtractor and TypeInformation will use SerializerConfig instead of ExecutionConfig .

/** 
 * A config to define the behavior for serializer in flink job, it manages the registered type and serializers.
 * The config is created from job configuration and used by flink to create serializer for data type.
 **/
@PublicEvolving
public final class SerializerConfig implements Serializable {
    private SerializerConfig() { }
    
    /** Create serializer config instance from configuration. */
    public static SerializerConfig create(Configuration configuration) {
        ...;
    }
    
    /** Returns the registered types with their Kryo Serializer classes. */
    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
            getRegisteredTypesWithKryoSerializerClasses();
    
    /** Returns the registered Kryo Serializer classes. */
    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
            getDefaultKryoSerializerClasses();
    
    /** Returns the registered Kryo types. */
    public LinkedHashSet<Class<?>> getKryoTypes();
    
    /** Returns the registered POJO types. */
    public LinkedHashSet<Class<?>> getPojoTypes();
    
    /** Returns the registered type info factories. */
    public LinkedHashMap<Class<?>, Class<TypeInfoFactory<?>>> getTypeInfoFactories();
    
    /**
     * Checks whether generic types are supported. Generic types are types that go through Kryo
     * during serialization.
     *
     * <p>Generic types are enabled by default.
     *
     * @see #enableGenericTypes()
     * @see #disableGenericTypes()
     */
    public boolean hasGenericTypesDisabled();
    
    /** Returns whether the Apache Avro is the serializer for POJOs. */
    public boolean isForceAvroEnabled();
    
    /** Returns whether kryo is the serialzier for POJOs. */
    public boolean isForceKryoEnabled();
}

The relevant method in TypeInformation 

Method

Annotation

org.apache.flink.api.common.typeinfo#createSerializer(SerializerConfig config)

@PublicEvolving

Currently, option pipeline.generic-types in Flink is used to enable/disable Kryo serialzier, its default value is true which means Flink will use Kryo as fallback serializer. This causes Kryo to be used by default to serialize some user-defined data types without the user being aware of it, which can lead to incompatibility between the new and old versions of the job state during version upgrades. To avoid this issue, in Flink-2.0 , the default value of pipeline.generic-types can be changed to false to prevent the useage of the Kryo serializer without the user's knowledge.

Proposed Changes

When user create a Flink job, TypeExtractor will choose an appropriate serializer for data type and create TypeInformation . After supporting the above data types and serializer configurations, when Flink needs to create a serializer and TypeInformation  for a data type, it follows the following priority.

The data types and serializers in the configuration file have the highest priority. For data types that are not in the configuration, Flink traverses and creates serializers for TypeInfo  annotation, basic data, Tuple, composite data, Pojo and generic data types in turn. Besides current built-in serializers, this FLIP will add more serializers for composited data types List , Map , Set and so on. Flink has TypeInformation for these data types like MapTypeInfo , ListTypeInfo  and MultisetTypeInfo , however, TypeExtractor cannot recognize the composited data types and use these serializers. We would like to support them in this FLIP.

Flink uses AvroUtils  to manage Avro  serializers. When flink-avro  is in the job classpath, it creates the org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils  utility class. Otherwise, it uses the built-in org.apache.flink.api.java.typeutils.AvroUtils$DefaultAvroUtils . AvroUtils  has following methods which are used in different places.

1) void addAvroSerializersIfRequired(ExecutionConfig reg, Class<?> type)

The Serializers  calls this method to register Avro  data types with the kryo  serializer through the ExecutionConfig#registerTypeWithKryoSerializer  and ExecutionConfig#addDefaultKryoSerializer methods. The uppermost entry point for this method is in the ExecutionEnvironment, mainly for use by the DataSet  API to create jobs. Since DataSet  will no longer be used and will be removed in the future, this method in AvroUtils  can be marked as Deprecated  and removed in flink-2.0 .

2) void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations)

KryoSerializer registers serializer for GenericData.Array in avro to kryo registration as follows

a) If flink-avro exists in job classpath, it automatically creates AvroKryoSerializerUtils and registers GenericData.Array and its serializer to kryo serializer.

b) When flink-avro  is not in the job classpath, a default DefaultAvroUtils  is created and a dummy serializer is registered. As mentioned above, we can add a pipeline.force-kryo-avro  option with a default value of false. Only when the option is set to true, flink will lookup flink-avro  module and register the Avro serializer in KryoSerializer . 

3) TypeSerializer<T> createAvroSerializer(Class<T> type)

The default value of the Flink job option pipeline.force-avro  is false. When users set it to true, they need to add flink-avro  to the job classpath. Then, Pojo  data will use Avro  serializers, and PojoTypeInfo  will create an AvroSerializer  instance for the data type. This part of the processing can remain unchanged.

4) TypeInformation<T> createAvroTypeInfo(Class<T> type)

When a Flink job creates a serializer for a data type which is a subclass of org.apache.avro.specific.SpecificRecordBase , it needs to ensure that flink-avro  is in the job classpath. This part of the processing can remain unchanged.

Compatibility, Deprecation, and Migration Plan

1) Usage Of Fallback Kryo Serializer

Since the Kryo  serializer has issues such as schema evolution and version upgrades, in Flink-2.0  we will default to disabling the Kryo serializer. If users do not turn on it manually, flink job will throw an exception to remind that they need to set pipeline.generic-types to true, or develop a custom serializer and add it in configuration.

2) Existing Serializer Registration and Configuration

The methods about serialization in StreamExecutionEnvironment and ExecutionConfig will be deprecated and removed in flink-2.0 . The newly added SerializerConfig is compatible with the current interface of ExecutionConfig . For currently flink jobs that set data types and serializers, they can use following methods to complete the migration for compatibility with flink-2.0 .

a) StreamExecutionEnvironment#registerType(Class<?> type)、ExecutionConfig#registerKryoType(Class<?> type)和ExecutionConfig#registerPojoType(Class<?> type)

Configure the specified data types to use the Pojo and Kryo serializers as follows:

pipeline.serialization-config: org.example.PojoDataClass: {type: pojo}, org.example.KryoDataClass: {type: kryo}

b) StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)、ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

Configure the specified data types to use the Kryo  serializer class as follows:

pipeline.serialization-config: org.example.KryoDataClass: {type: kryo, kryo-type: registered, class: org.example.KryoSerializerClass}

c) ExecutionConfig#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

Configure the specified data types to use default Kryo serializer classes as follows:

pipeline.serialization-config: org.example.KryoDataClass: {type: kryo, kryo-type: default, class: org.example.KryoSerializerClass}

d) For data types are annotated with @TypeInfo use TypeInfoFactory :

pipeline.serialization-config: org.example.TypeDataClass: {type: typeinfo, class: org.example.TypeDataInfoFactory}

e) StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, T serializer)和ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, T serializer)

We do not want to support these methods in flink-2.0 . Users can construct the specified serializer class and specify the Kryo Serializer class as described above.

f) For the configuration of specifying data types and serializer classes using pipeline.default.kryo-serializers , pipeline.registered-kryo-types , and pipeline.registered-pojo-types , users can reconfigure them according to the new format.

Test Plan

UT & E2E

Rejected Alternatives

NONE


[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278464992
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory

  • No labels