Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRA


Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink supports different serializers such as Java basic types, Pojo , and Kryo . When developing Flink jobs, users can register custom data types and serializers through StreamExecutionEnvironment  and ExecutionConfig , and can also configure Pojo  and Kryo  serialized data classes through the three configuration options as pipeline.default.kryo-serializers , pipeline.registered-kryo-types , and pipeline.registered-pojo-types. Flink jobs will look up and create corresponding serializers for each data type based on the data types and registered serializers, and there are mainly the following problems:

...

Code Block
@PublicEvolving
public class PipelineOptions {

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> KRYO_DEFAULT_SERIALIZERS =
        key("pipeline.default-kryo-serializers")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of pairs of class names and Kryo serializers class names to be used"
                                                + " as Kryo default serializers")
                                .linebreak()
                                .linebreak()
                                .text("Example:")
                                .linebreak()
                                .add(
                                        TextElement.code(
                                                "class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;"
                                                        + " class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2"))
                                .build());

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> KRYO_REGISTERED_CLASSES =
        key("pipeline.registered-kryo-types")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of types to be registered with the serialization stack. If the type"
                                                + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the"
                                                + " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
                                                + " sure that only tags are written.")
                                .build());

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> POJO_REGISTERED_CLASSES =
        key("pipeline.registered-pojo-types")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of types to be registered with the serialization stack. If the type"
                                                + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the"
                                                + " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
                                                + " sure that only tags are written.")
                                .build());
 
    public static final ConfigOption<List<String>> SERIALIZATION_CONFIG =
        key("pipeline.serialization-config")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of pairs of class names and serializer config to be used. There is `type` fiend in"
                                                 + " the serializer config with a value 'pojo', 'kryo' or 'typeinfo' and each `type` has its own"
                                                 + " configuration.")
                                .linebreak()
                                .linebreak()
                                .text("Example:")
                                .linebreak()
                                .add(
                                        TextElement.code(
                                                "org.example.ExampleClass1: {type: pojo};"
                                                + " org.example.ExampleClass2: {type: kryo};"
                                                + " org.example.ExampleClass3: {type: kryo, kryo-type: default, class: org.example.Class3KryoSerializer};"
                                                + " org.example.ExampleClass4: {type: kryo, kryo-type: registered, class: org.example.Class4KryoSerializer};"
                                                + " org.example.ExampleClass5: {type: typeinfo, class: org.example.Class5TypeInfoFactory}"))
                                .build());
    
}

...

This FLIP introduces SerializerConfig for serializers in flink to decouple the serializer from ExecutionConfig , it can be created from Configuration  and provides methods for serializers. The TypeExtractor and TypeInformation will use SerializerConfig instead of ExecutionConfig .

Code Block
/** 
 * A config to define the behavior for serializer in flink job, it manages the registered type and serializers.
 * The config is created from job configuration and used by flink to create serializer for data type.
 **/
@PublicEvolving
public final class SerializerConfig implements Serializable {
    private SerializerConfig() { }
    
    /** Create serializer config instance from configuration. */
    public static SerializerConfig create(Configuration configuration) {
        ...;
    }
    
    /** Returns the registered types with their Kryo Serializer classes. */
    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
            getRegisteredTypesWithKryoSerializerClasses();
    
    /** Returns the registered Kryo Serializer classes. */
    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
            getDefaultKryoSerializerClasses();
    
    /** Returns the registered Kryo types. */
    public LinkedHashSet<Class<?>> getKryoTypes();
    
    /** Returns the registered POJO types. */
    public LinkedHashSet<Class<?>> getPojoTypes();
    
    /** Returns the registered type info factories. */
    public LinkedHashMap<Class<?>, Class<TypeInfoFactory<?>>> getTypeInfoFactories();
    
    /**
     * Checks whether generic types are supported. Generic types are types that go through Kryo
     * during serialization.
     *
     * <p>Generic types are enabled by default.
     *
     * @see #enableGenericTypes()
     * @see #disableGenericTypes()
     */
    public boolean hasGenericTypesDisabled();
    
    /** Returns whether the Apache Avro is the serializer for POJOs. */
    public boolean isForceAvroEnabled();
    
    /** Returns whether kryo is the serialzier for POJOs. */
    public boolean isForceKryoEnabled();
}

...

Currently, option pipeline.generic-types in Flink is used to enable/disable Kryo serialzier, its default value is true which means Flink will use Kryo as fallback serializer. This causes Kryo to be used by default to serialize some user-defined data types without the user being aware of it, which can lead to incompatibility between the new and old versions of the job state during version upgrades. To avoid this issue, in Flink-2.0 , the default value of pipeline.generic-types can be changed to false to prevent the useage of the Kryo serializer without the user's knowledge.

...