Motivation
Flink supports different serializers such as Java basic types, Pojo
, and Kryo
. When developing Flink jobs, users can register custom data types and serializers through StreamExecutionEnvironment
and ExecutionConfig
, and can also configure Pojo
and Kryo
serialized data classes through the three configuration options as pipeline.default.kryo-serializers
, pipeline.registered-kryo-types
, and pipeline.registered-pojo-types
. Flink jobs will look up and create corresponding serializers for each data type based on the data types and registered serializers, and there are mainly the following problems:
1) Users register data types and serializers through hard codes, they need to modify the coeds when upgrading job version which cannot be simply achieved through configuration. As mentioned in the document [1], we'd like to unify job relevant settings into the configuration file.
2) Currently serializer configuration capabilities are not equal to API. The three configuration options are provided to configure data types and corresponding serializers, but custom serializers are not supported. At the same time, there is a priority issue among the three configuration items. If users configure the same data type in these configuration items, it will cause semantic conflicts in serialization.
3) Kryo serializer compatibility issues. After users use custom data types in Flink jobs, Flink will choose the corresponding serializer. If no serializer is found, the Kryo serializer will be used. Using the Kryo serializer has the following problems
a) Different versions of Kryo may have compatibility issues, resulting in incompatibility between the old and new versions of jobs when Flink upgrades the JVM or Kryo version.
b) The issues of schema evolution in state, such as adding or removing columns from the state data, and the new job cannot recover from the original state when it uses Kryo serializer.
Flink has options to turn on/off Kryo
serializer and configure Kryo
/ Pojo
serializers, this FLIP wound like to improve serialization configuration and usage in Flink based on that.
1) Configuration for customized serializer. Flink supports @TypeInfo
annotation and TypeInfoFactory
to create customized for user defined data refer to doc [2]. This FLIP proposes to configure customized serializers in configuration and users can update them for jobs without modifying any codes.
2) Provide uniform configuration for Kryo
, Pojo
and customized serializer. Flink now configure serializers for Kryo
and Pojo
in different options, they can be combined with customized serializers in one option which will simplify the usage for users and fix the priority issues for the same data type in different options.
3) Add more built-in serializers in Flink such as List
, Map
and Set
. Flink has many serializers for basic data type, but when users use composite data type such as List<String>
or Map<String, Integer>
, Flink can only use Kryo
or use defined serializers for them. More built-in serializers can be added in Flink for them to improve the performance.
Public Interface
Deprecated Methods
The following methods in ExecutionConfig
will be deprecated
and be removed in flink-2.0
Method | Annotation |
org.apache.flink.api.common.ExecutionConfig#registerKryoType(Class<?> type) | @Public |
org.apache.flink.api.common.ExecutionConfig#registerPojoType(Class<?> type) | |
org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass) | |
org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, T serializer) | |
org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) | |
org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer( Class<?> type, T serializer) | |
org.apache.flink.api.common.ExecutionConfig#getRegisteredKryoTypes() | |
org.apache.flink.api.common.ExecutionConfig#getRegisteredPojoTypes() | |
org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializerClasses() | |
org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializers() | |
org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializerClasses() | |
org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializers() | |
org.apache.flink.api.common.ExecutionConfig#enableForceAvro() | |
org.apache.flink.api.common.ExecutionConfig#enableForceKryo() | |
org.apache.flink.api.common.ExecutionConfig#enableGenericTypes() | |
org.apache.flink.api.common.ExecutionConfig#setForceAvro(boolean forceAvro) | |
org.apache.flink.api.common.ExecutionConfig#setForceKryo(boolean forceKryo) | |
org.apache.flink.api.common.ExecutionConfig#disableForceAvro() | |
org.apache.flink.api.common.ExecutionConfig#disableForceKryo() | |
org.apache.flink.api.common.ExecutionConfig#disableGenericTypes() | |
org.apache.flink.api.common.ExecutionConfig#hasGenericTypesDisabled() | |
org.apache.flink.api.common.ExecutionConfig#isForceAvroEnabled() | |
org.apache.flink.api.common.ExecutionConfig#isForceKryoEnabled() |
The following methods in StreamExecutionEnvironment
will be deprecated and removed in flink-2.0
Method | Annotation |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerType(Class<?> type) | @Public |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass) | |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, T serializer) | |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) | |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, T serializer) |
The following method in TypeInformation
will be deprecated and removed in flink-2.0
Method | Annotation |
org.apache.flink.api.common.typeinfo#createSerializer(ExecutionConfig config) | @PublicEvolving |
Users can configure pipeline.force-avro
, pipeline.force-kryo
and pipeline.generic-types
in configuration after enableForceAvro
, enableForceKryo
and enableGenericTypes
are deprecated and removed as follows.
Configuration configuraiton = new Configuration(); configuration.set(PipelineOptions.GENERIC_TYPES, false); configuration.set(PipelineOptions.FORCE_KRYO, true); configuration.set(PipelineOptions.FORCE_AVRO, true); StreamExecutionEnvironment env = new StreamExecutionEnvironment(configuration); // Build flink job with env.
Serialization Options
Introduce SerializerConfig
Disable Kryo By Default
[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=278464992
[2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory