Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The following methods in ExecutionConfig will be deprecated and be removed in flink-2.0

ExecutionConfig#setForceAvro(boolean forceAvro)org.apache.flink.api.common.

Method

Annotation

org.apache.flink.api.common.ExecutionConfig#registerKryoType(Class<?> type)

@Public

org.apache.flink.api.common.ExecutionConfig#registerPojoType(Class<?> type)

org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, T serializer)

org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer( Class<?> type, T serializer)

org.apache.flink.api.common.ExecutionConfig#getRegisteredKryoTypes()

org.apache.flink.api.common.ExecutionConfig#getRegisteredPojoTypes()

org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializerClasses()

org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializers()

org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializerClasses()

org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializers()

org.apache.flink.api.common.ExecutionConfig#enableForceAvro()

org.apache.flink.api.common.ExecutionConfig#enableForceKryo()

org.apache.flink.api.common.ExecutionConfig#enableGenericTypes()

org.apache.flink.api.common.

org.apache.flink.api.common.ExecutionConfig#setForceKryo(boolean forceKryo)

ExecutionConfig#disableForceAvro()

org.apache.flink.api.common.ExecutionConfig#disableForceKryo()

org.apache.flink.api.common.ExecutionConfig#disableGenericTypes()

org.apache.flink.api.common.ExecutionConfig#hasGenericTypesDisabled()

org.apache.flink.api.common.ExecutionConfig#isForceAvroEnabled()

org.apache.flink.api.common.ExecutionConfig#isForceKryoEnabled()

The following methods in StreamExecutionEnvironment will be deprecated and removed in flink-21.020

Method

Annotation

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerType(Class<?> type)

@Public

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, T serializer)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, T serializer)

The following method in TypeInformation and it's sub-classes will be deprecated and removed in flink-2next release.0

Method

Annotation

org.apache.flink.api.common.typeinfo#createSerializer(ExecutionConfig config)

@PublicEvolving

...

Code Block
@PublicEvolving
public class PipelineOptions {

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> KRYO_DEFAULT_SERIALIZERS =
        key("pipeline.default-kryo-serializers")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of pairs of class names and Kryo serializers class names to be used"
                                                + " as Kryo default serializers")
                                .linebreak()
                                .linebreak()
                                .text("Example:")
                                .linebreak()
                                .add(
                                        TextElement.code(
                                                "class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;"
                                                        + " class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2"))
                                .build());

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> KRYO_REGISTERED_CLASSES =
        key("pipeline.registered-kryo-types")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of types to be registered with the serialization stack. If the type"
                                                + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the"
                                                + " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
                                                + " sure that only tags are written.")
                                .build());

    /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */
    @Deprecated
    public static final ConfigOption<List<String>> POJO_REGISTERED_CLASSES =
        key("pipeline.registered-pojo-types")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated list of types to be registered with the serialization stack. If the type"
                                                + " is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the"
                                                + " type ends up being serialized with Kryo, then it will be registered at Kryo to make"
                                                + " sure that only tags are written.")
                                .build());
 
    public static final ConfigOption<List<String>> SERIALIZATION_CONFIG =
        key("pipeline.serialization-config")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        Description.builder()
                                .text(
                                        "Semicolon separated listList of pairs of class names and serializer config to be used. There is `type` fiend in"
                                                 + " the serializer config with a value 'pojo', 'kryo' or 'typeinfo' and each `type` has its own"
                                                 + " configuration.")
                                .linebreak()
                                .linebreak()
                                .text("Example:")
                                .linebreak()
                                .add(
                                        TextElement.code(
                                                "[org.example.ExampleClass1: {type: pojo};,"
                                                + " org.example.ExampleClass2: {type: kryo};,"
                                                + " org.example.ExampleClass3: {type: kryo, kryo-type: default, class: org.example.Class3KryoSerializer};,"
                                                + " org.example.ExampleClass4: {type: kryo, kryo-type: registered, class: org.example.Class4KryoSerializer};,"
                                                + " org.example.ExampleClass5: {type: typeinfo, class: org.example.Class5TypeInfoFactory}]"))
                                .build());
    
}

The value for pipeline.serialization-config is a key-value pair, there will be data class name for the key and the serializers of the data for the value. The configuration for serialization is as follows.

...

Code Block
/** 
 * A config to define the behavior for serializer in flink job, it manages the registered type and serializers.
 * The config is created from job configuration and used by flink to create serializer for data type.
 **/
@PublicEvolving
public final class SerializerConfig implements Serializable {

    /** Create serializer config instance from configuration. */
    public SerializerConfig(Configuration configuration) {
        ...;
    }
    
    /** Returns the registered types with their Kryo Serializer classes. */
    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
            getRegisteredTypesWithKryoSerializerClasses();
    
    /** Returns the registered Kryo Serializer classes. */
    public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>
            getDefaultKryoSerializerClasses();
    
    /** Returns the registered Kryo types. */
    public LinkedHashSet<Class<?>> getKryoTypesgetRegisteredKryoTypes();
    
    /** Returns the registered POJO types. */
    public LinkedHashSet<Class<?>> getPojoTypesgetRegisteredPojoTypes();
    
    /** Returns the registered type info factories. */
    public LinkedHashMap<Class<Map<Class<?>, Class<? extends Class<TypeInfoFactory<TypeInfoFactory<?>>> getTypeInfoFactoriesgetRegisteredTypeInfoFactories();
    
    /**
     * Checks whether generic types are supported. Generic types are types that go through Kryo
     * during serialization.
     *
     * <p>Generic types are enabled by default.
     *
     * @see #enableGenericTypes()
     * @see #disableGenericTypes()
     */
    public boolean hasGenericTypesDisabled();
    
    /** Returns whether the Apache Avro is the serializer for POJOs. */
    public boolean isForceAvroEnabled();
    
    /** Returns whether kryo is the serialzier for POJOs. */
    public boolean isForceKryoEnabled();
}

...