Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttps://lists.apache.org/thread/m67s4qfrh660lktpq7yqf9docvvf5o9l
Vote threadhttps://lists.apache.org/thread/2xmcxs67xxzwool554fglrnklyvw348h
JIRA

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-34037

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The following methods in ExecutionConfig will be deprecated and be removed in flink-2.0

()org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializerClassesExecutionConfig#getDefaultKryoSerializersExecutionConfig#enableForceAvroExecutionConfig#enableForceKryoExecutionConfig#enableGenericTypesExecutionConfig#setForceAvroboolean forceAvroExecutionConfig#setForceKryoboolean forceKryo

Method

Annotation

org.apache.flink.api.common.ExecutionConfig#registerKryoType(Class<?> type)

@Public

org.apache.flink.api.common.ExecutionConfig#registerPojoType(Class<?> type)

org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, T serializer)

org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer( Class<?> type, T serializer)

org.apache.flink.api.common.ExecutionConfig#getRegisteredKryoTypes()

org.apache.flink.api.common.ExecutionConfig#getRegisteredPojoTypes()

org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializerClasses

org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializers()

()

org.apache.flink.api.common.

ExecutionConfig#getRegisteredTypesWithKryoSerializers()

org.apache.flink.api.common.

ExecutionConfig#getDefaultKryoSerializerClasses()

org.apache.flink.api.common.

ExecutionConfig#getDefaultKryoSerializers()

org.apache.flink.api.common.

ExecutionConfig#enableForceAvro()

org.apache.flink.api.common.

ExecutionConfig#enableForceKryo(

)

org.apache.flink.api.common.

ExecutionConfig#enableGenericTypes(

)

org.apache.flink.api.common.ExecutionConfig#disableForceAvro()

org.apache.flink.api.common.ExecutionConfig#disableForceKryo()

org.apache.flink.api.common.ExecutionConfig#disableGenericTypes()

org.apache.flink.api.common.ExecutionConfig#hasGenericTypesDisabled()

org.apache.flink.api.common.ExecutionConfig#isForceAvroEnabled()

org.apache.flink.api.common.ExecutionConfig#isForceKryoEnabled()

The following methods in StreamExecutionEnvironment will be deprecated and removed in flink-21.020

Method

Annotation

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerType(Class<?> type)

@Public

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, T serializer)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, T serializer)

The following method in TypeInformation and it's sub-classes will be deprecated and removed in flink-2next release.0

Method

Annotation

org.apache.flink.api.common.typeinfo#createSerializer(ExecutionConfig config)

@PublicEvolving

...

Code Block
@PublicEvolving
public class PipelineOptions {

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> KRYO_DEFAULT_SERIALIZERS =
        key("pipeline.default-kryo-serializers")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of pairs of class names and Kryo serializers class names to be used"
                                                + " as Kryo default serializers")
                                .linebreak()
                                .linebreak()
                                .text("Example:")
                                .linebreak()
                                .add(
                                        TextElement.code(
                                                "class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;"
                                                        + " class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2"))
                                .build());

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> KRYO_REGISTERED_CLASSES =
        key("pipeline.registered-kryo-types")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of types to be registered with the serialization stack. If the type"
                                                + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the"
                                                + " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
                                                + " sure that only tags are written.")
                                .build());

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> POJO_REGISTERED_CLASSES =
        key("pipeline.registered-pojo-types")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of types to be registered with the serialization stack. If the type"
                                                + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the"
                                                + " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
                                                + " sure that only tags are written.")
                                .build());
 
    public static final ConfigOption<List<String>> SERIALIZATION_CONFIG =
        key("pipeline.serialization-config")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated listList of pairs of class names and serializer config to be used. There is `type` fiend in"
                                                 + " the serializer config with a value 'pojo', 'kryo' or 'typeinfo' and each `type` has its own"
                                                 + " configuration.")
                                .linebreak()
                                .linebreak()
                                .text("Example:")
                                .linebreak()
                                .add(
                                        TextElement.code(
                                                "[org.example.ExampleClass1: {type: pojo};,"
                                                + " org.example.ExampleClass2: {type: kryo};,"
                                                + " org.example.ExampleClass3: {type: kryo, kryo-type: default, class: org.example.Class3KryoSerializer};,"
                                                + " org.example.ExampleClass4: {type: kryo, kryo-type: registered, class: org.example.Class4KryoSerializer};,"
                                                + " org.example.ExampleClass5: {type: typeinfo, class: org.example.Class5TypeInfoFactory}]"))
                                .build());
    
}

The value for pipeline.serialization-config is a key-value pair, there will be data class name for the key and the serializers of the data for the value. The configuration for serialization is as follows.

...

Code Block
/** 
 * A config to define the behavior for serializer in flink job, it manages the registered type and serializers.
 * The config is created from job configuration and used by flink to create serializer for data type.
 **/
@PublicEvolving
public final class SerializerConfig implements Serializable {
    private SerializerConfig() { }
    
    /** Create serializer config instance from configuration. */
    public static SerializerConfig create(Configuration configuration) {
        ...;
    }
    
    /** Returns the registered types with their Kryo Serializer classes. */
    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
            getRegisteredTypesWithKryoSerializerClasses();
    
    /** Returns the registered Kryo Serializer classes. */
    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
            getDefaultKryoSerializerClasses();
    
    /** Returns the registered Kryo types. */
    public LinkedHashSet<Class<?>> getKryoTypesgetRegisteredKryoTypes();
    
    /** Returns the registered POJO types. */
    public LinkedHashSet<Class<?>> getPojoTypesgetRegisteredPojoTypes();
    
    /** Returns the registered type info factories. */
    public LinkedHashMap<Class<Map<Class<?>, Class<? extends Class<TypeInfoFactory<TypeInfoFactory<?>>> getTypeInfoFactoriesgetRegisteredTypeInfoFactories();
    
    /**
     * Checks whether generic types are supported. Generic types are types that go through Kryo
     * during serialization.
     *
     * <p>Generic types are enabled by default.
     *
     * @see #enableGenericTypes()
     * @see #disableGenericTypes()
     */
    public boolean hasGenericTypesDisabled();
    
    /** Returns whether the Apache Avro is the serializer for POJOs. */
    public boolean isForceAvroEnabled();
    
    /** Returns whether kryo is the serialzier for POJOs. */
    public boolean isForceKryoEnabled();
}

...