Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Motivation

Flink supports different serializers such as Java basic types, Pojo , and Kryo . When developing Flink jobs, users can register custom data types and serializers through StreamExecutionEnvironment  and ExecutionConfig , and can also configure Pojo  and Kryo  serialized data classes through the three configuration options as pipeline.default.kryo-serializers , pipeline.registered-kryo-types , and pipeline.registered-pojo-types. Flink jobs will look up and create corresponding serializers for each data type based on the data types and registered serializers, and there are mainly the following problems:

1) Users register data types and serializers through hard codes, they need to modify the coeds when upgrading job version which cannot be simply achieved through configuration. As mentioned in the document [1], we'd like to unify job relevant settings into the configuration file.

2) Currently serializer configuration capabilities are not equal to API. The three configuration options are provided to configure data types and corresponding serializers, but custom serializers are not supported. At the same time, there is a priority issue among the three configuration items. If users configure the same data type in these configuration items, it will cause semantic conflicts in serialization.

3) Kryo serializer compatibility issues. After users use custom data types in Flink jobs, Flink will choose the corresponding serializer. If no serializer is found, the Kryo serializer will be used. Using the Kryo serializer has the following problems

    a) Different versions of Kryo may have compatibility issues, resulting in incompatibility between the old and new versions of jobs when Flink upgrades the JVM or Kryo version.

    b) The issues of schema evolution in state, such as adding or removing columns from the state data, and the new job cannot recover from the original state when it uses Kryo serializer.

Flink has options to turn on/off Kryo  serializer and configure Kryo / Pojo serializers, this FLIP wound like to improve serialization configuration and usage in Flink based on that.

1) Configuration for customized serializer. Flink supports @TypeInfo annotation and TypeInfoFactory to create customized for user defined data refer to doc [2]. This FLIP proposes to configure customized serializers in configuration and users can update them for jobs without modifying any codes.

2) Provide uniform configuration for Kryo , Pojo and customized serializer. Flink now configure serializers for Kryo and Pojo in different options, they can be combined with customized serializers in one option which will simplify the usage for users and fix the priority issues for the same data type in different options.

3) Add more built-in serializers in Flink such as List , Map and Set. Flink has many serializers for basic data type, but when users use composite data type such as List<String> or Map<String, Integer> , Flink can only use Kryo or use defined serializers for them. More built-in serializers can be added in Flink for them to improve the performance.

Public Interface

The following methods in ExecutionConfig will be deprecated and be removed in flink-2.0

Method

Annotation

org.apache.flink.api.common.ExecutionConfig#registerKryoType(Class<?> type)

@Public

org.apache.flink.api.common.ExecutionConfig#registerPojoType(Class<?> type)

org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, T serializer)

org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer( Class<?> type, T serializer)

org.apache.flink.api.common.ExecutionConfig#getRegisteredKryoTypes()

org.apache.flink.api.common.ExecutionConfig#getRegisteredPojoTypes()

org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializerClasses()

org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializers()

org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializerClasses()

org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializers()

org.apache.flink.api.common.ExecutionConfig#enableForceAvro()

org.apache.flink.api.common.ExecutionConfig#enableForceKryo()

org.apache.flink.api.common.ExecutionConfig#enableGenericTypes()

org.apache.flink.api.common.ExecutionConfig#setForceAvro(boolean forceAvro)

org.apache.flink.api.common.ExecutionConfig#setForceKryo(boolean forceKryo)

org.apache.flink.api.common.ExecutionConfig#disableForceAvro()

org.apache.flink.api.common.ExecutionConfig#disableForceKryo()

org.apache.flink.api.common.ExecutionConfig#disableGenericTypes()

org.apache.flink.api.common.ExecutionConfig#hasGenericTypesDisabled()

org.apache.flink.api.common.ExecutionConfig#isForceAvroEnabled()

org.apache.flink.api.common.ExecutionConfig#isForceKryoEnabled()

The following methods in StreamExecutionEnvironment will be deprecated and removed in flink-2.0

Method

Annotation

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerType(Class<?> type)

@Public

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, T serializer)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass)

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, T serializer)

The following method in TypeInformation will be deprecated and removed in flink-2.0

Method

Annotation

org.apache.flink.api.common.typeinfo#createSerializer(ExecutionConfig config)

@PublicEvolving

Users can configure pipeline.force-avro , pipeline.force-kryo and pipeline.generic-types in configuration after enableForceAvro , enableForceKryo and enableGenericTypes are deprecated and removed as follows.

Code Block
Configuration configuraiton = new Configuration();
configuration.set(PipelineOptions.GENERIC_TYPES, false);
configuration.set(PipelineOptions.FORCE_KRYO, true);
configuration.set(PipelineOptions.FORCE_AVRO, true);

StreamExecutionEnvironment env = new StreamExecutionEnvironment(configuration);
// Build flink job with env.






[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278464992
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory