Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
Discussion thread |
---|
https://lists.apache.org/ |
thread/m67s4qfrh660lktpq7yqf9docvvf5o9l | |||||||||||||
Vote thread | https://lists.apache.org/thread/2xmcxs67xxzwool554fglrnklyvw348h | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
JIRA |
|
Release | <Flink Version> |
---|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Flink supports different serializers such as Java basic types, Pojo
, and Kryo
. When developing Flink jobs, users can register custom data types and serializers through StreamExecutionEnvironment
and ExecutionConfig
, and can also configure Pojo
and Kryo
serialized data classes through the three configuration options as pipeline.default.kryo-serializers
, pipeline.registered-kryo-types
, and pipeline.registered-pojo-types
. Flink jobs will look up and create corresponding serializers for each data type based on the data types and registered serializers, and there are mainly the following problemsissues:
1) Users register data types and serializers through hard codes, they need to modify the coeds when upgrading job version which cannot be simply achieved through configuration. As mentioned in the document [1], we'd like to unify job relevant settings into the configuration file.
2) Currently serializer configuration capabilities are not equal to API. The three configuration Three options are provided to configure data types and corresponding serializers, but custom serializers are not supported. At the same time, there is a are priority issue issues among the three configuration items. If users configure the same a data type in all these configuration itemsoptions, it will cause semantic conflicts in serialization.
3) Kryo serializer compatibility issues. After users use Users can create custom data types in Flink jobs, Flink will choose the corresponding serializer. If no serializer is found, the Kryo serializer will be used. Using the Kryo serializer has the following problemsissues
a) Different Kryo
versions of Kryo may have compatibility issues, resulting in incompatibility between the old and new versions of jobs when Flink upgrades the JVM or Kryo version.
b) The issues of schema evolution in state, such as adding or removing columns from for the state data, and the new job cannot recover from the original state when it uses Kryo serializer.
...
1) Configuration for customized serializer. Flink supports @TypeInfo
annotation and TypeInfoFactory
to create customized serializers for user defined data refer to doc [2]. This FLIP proposes to configure customized serializers in configuration and users can update them for jobs without modifying any codes.
2) Provide uniform configuration for Kryo
, Pojo
and customized serializer. Flink now configure configures serializers for Kryo
and Pojo
in different options, they can be combined with customized serializers in one option which will simplify the usage for users and fix the priority issues for the same data type in different options.
3) Add more built-in serializers in Flink such as List
, Map
and Set
. Flink has many serializers for basic data typetypes, but when users use composite data type such as List<String>
or Map<String, Integer>
, Flink can only use Kryo
or use defined serializers for them. More built-in serializers can should be added in Flink for them to improve the performance.
...
The following methods in ExecutionConfig
will be deprecated
and be removed in flink-2.0
Method | Annotation |
org.apache.flink.api.common.ExecutionConfig#registerKryoType(Class<?> type) | @Public |
org.apache.flink.api.common.ExecutionConfig#registerPojoType(Class<?> type) | |
org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass) | |
org.apache.flink.api.common.ExecutionConfig#registerTypeWithKryoSerializer(Class<?> type, T serializer) | |
org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) | |
org.apache.flink.api.common.ExecutionConfig#addDefaultKryoSerializer( Class<?> type, T serializer) | |
org.apache.flink.api.common.ExecutionConfig#getRegisteredKryoTypes() | |
org.apache.flink.api.common.ExecutionConfig#getRegisteredPojoTypes() | |
org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializerClasses() | |
org.apache.flink.api.common.ExecutionConfig#getRegisteredTypesWithKryoSerializers() | |
org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializerClasses() | |
org.apache.flink.api.common.ExecutionConfig#getDefaultKryoSerializers() | |
org.apache.flink.api.common.ExecutionConfig#enableForceAvro() | |
org.apache.flink.api.common.ExecutionConfig#enableForceKryo() | |
org.apache.flink.api.common.ExecutionConfig#enableGenericTypes |
org.apache.flink.api.common.ExecutionConfig#setForceAvro(boolean forceAvro)
( |
) |
org.apache.flink.api.common.ExecutionConfig#disableForceAvro() |
org.apache.flink.api.common.ExecutionConfig#disableForceKryo() |
org.apache.flink.api.common.ExecutionConfig#disableGenericTypes() |
org.apache.flink.api.common.ExecutionConfig#hasGenericTypesDisabled() |
org.apache.flink.api.common.ExecutionConfig#isForceAvroEnabled() |
org.apache.flink.api.common.ExecutionConfig#isForceKryoEnabled() |
The following methods in StreamExecutionEnvironment
will be deprecated and removed in flink-21.020
Method | Annotation |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerType(Class<?> type) | @Public |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass) | |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#registerTypeWithKryoSerializer(Class<?> type, T serializer) | |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass) | |
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addDefaultKryoSerializer(Class<?> type, T serializer) |
The following method in TypeInformation
and it's sub-classes will be deprecated and removed in flink-2next release.0
Method | Annotation |
org.apache.flink.api.common.typeinfo#createSerializer(ExecutionConfig config) | @PublicEvolving |
Users can configure pipeline.force-avro
, pipeline.force-kryo
and pipeline.generic-types
in configuration job after enableForceAvro
, enableForceKryo
and enableGenericTypes
are deprecated and removed as follows.
...
Code Block |
---|
@PublicEvolving public class PipelineOptions { /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */ @Deprecated public static final ConfigOption<List<String>> KRYO_DEFAULT_SERIALIZERS = key("pipeline.default-kryo-serializers") .stringType() .asList() .noDefaultValue() .withDescription( Description.builder() .text( "Semicolon separated list of pairs of class names and Kryo serializers class names to be used" + " as Kryo default serializers") .linebreak() .linebreak() .text("Example:") .linebreak() .add( TextElement.code( "class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1;" + " class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2")) .build()); /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */ @Deprecated public static final ConfigOption<List<String>> KRYO_REGISTERED_CLASSES = key("pipeline.registered-kryo-types") .stringType() .asList() .noDefaultValue() .withDescription( Description.asListbuilder() .noDefaultValue() .withDescriptiontext( Description.builder() "Semicolon separated list of types to be registered with the serialization stack. If the type" .text( + "Semicolon separated list of types to be is eventually serialized as a POJO, then the type is registered with the serializationPOJO stackserializer. If the type" + " is eventuallytype ends up being serialized as awith POJOKryo, then theit typewill isbe registered withat theKryo POJO serializer. If theto make" + " type ends up being serialized with Kryo, then it will be registered at Kryo to make" sure that only tags are written.") .build()); /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */ @Deprecated public static final ConfigOption<List<String>> POJO_REGISTERED_CLASSES = + " sure that only tags are written.key("pipeline.registered-pojo-types") .stringType() .buildasList()); /** This options will be removed in flink-2.0, use `pipeline.serialization-config` instead. */ .noDefaultValue() @Deprecated public static final ConfigOption<List<String>> POJO_REGISTERED_CLASSES = .withDescription( key("pipeline.registered-pojo-types") Description.stringTypebuilder() .asList() .noDefaultValuetext() .withDescription( Description.builder() "Semicolon separated list of types to be registered with the serialization stack. If the type" .text( + " is eventually serialized "Semicolonas separateda listPOJO, ofthen typesthe totype beis registered with the serializationPOJO stackserializer. If the type" + " type ends isup eventuallybeing serialized aswith a POJOKryo, then theit typewill isbe registered withat theKryo POJO serializer. If theto make" + " sure that only tags are written." type ends up being serialized with Kryo, then it will be registered at Kryo to make" ) .build()); public static final ConfigOption<List<String>> SERIALIZATION_CONFIG = key("pipeline.serialization-config") .stringType() + " sure that only tags are written."asList() .noDefaultValue() .buildwithDescription()); public static final ConfigOption<List<String>> SERIALIZATION_CONFIG = key("pipeline.serialization-config"Description.builder() .stringType() .asListtext() .noDefaultValue() .withDescription( "List of pairs of class names and serializer config to be used. There is `type` Description.builder() fiend in" .text( + " the serializer config with a value 'pojo', 'kryo' or 'typeinfo' and each `type` has its own" "Semicolon separated list of pairs of class names and serializer config to be used. There is `type` fiend in the serializer config with a value 'pojo', 'kryo' or 'typeinfo' and each `type` has its+ own" configuration.") .linebreak() .linebreak() .text("Example:") .linebreak() .add( TextElement.code( "[org.example.ExampleClass1: {type: pojo};," + " org.example.ExampleClass2: {type: kryo};," + " org.example.ExampleClass3: {type: kryo, kryo-type: default, class: org.example.Class3KryoSerializer};," + " org.example.ExampleClass4: {type: kryo, kryo-type: registered, class: org.example.Class4KryoSerializer};," + " org.example.ExampleClass5: {type: typeinfo, class: org.example.Class5TypeInfoFactory}]")) .build()); } |
The value for pipeline.serialization-config
is a key-value pair, there will be data class name for the key and the serializers of the data for the value. The configuration for serialization is as follows.
1) type
: the serializer type which may could be pojo
, kryo
or typeinfo
. If the value of type
is pojo
or kryo
without kryo-type
field, it means the data type will use pojo
or kryo
serialzier directly.
2) kryo-type
: the type for kryo
serializer which may could be default
or registered
. The kryo
serializer will use the serializer for the data type as default serializers when the kryo-type
is default
, and it will register the data type and its serializer to kryo
serializer when the kryo-type
is registered
. When the field kryo-type
is exist exists, there must be the a field class
to configure the specific serializer class name.
3) class: it is a the serializer class name for kryo
or typeinfo
. For kryo
serializer, it should be a subclass of com.esotericsoftware.kryo.Serializer
, and for typeinfo
it should be a subclass of org.apache.flink.api.common.typeinfo.TypeInfoFactory
.
Currently when Flink initializes the Kryo serializer, it automatically detects whether flink-avro
is in job classpath: if it does not exist, an avro dummy serializer is created and registered with Kryo; otherwise, the Avro serializer is loaded from flink-avro and registered with Kryo. We introduce a new option pipeline.force-kryo-avro
in flink and its default value is false
, and Flink
detects will detect the flink-avro
module and register avro serializer with Kryo
only when the option is configured as true
.
Code Block |
---|
@PublicEvolving public class PipelineOptions { public static final ConfigOption<Boolean> FORCE_KRYO_AVRO = key("pipeline.force-kryo-avro") .booleanType() .defaultValue(false) .withDescription(Description.builder() .text( "Force register avro classes in kryo serializer.") .linebreak() .linebreak() .text( "Important: Make sure to include the %s module.", code("flink-avro")) .build()); } |
...
This FLIP introduces SerializerConfig
for serializers in flink to decouple the serializer from ExecutionConfig
, it can be created from Configuration
and provides methods for serializers. The TypeExtractor
and TypeInformation
will use SerializerConfig
instead of ExecutionConfig
.
Code Block |
---|
/** * A config to define the behavior for serializer in flink job, it manages the registered type and serializers. * The config is created from job configuration and used by flink to create serializer for data type. **/ @PublicEvolving public final class SerializerConfig implements Serializable { private SerializerConfig() { } /** Create serializer config instance from configuration. */ public static SerializerConfig create(Configuration configuration) { ...; } /** Returns the registered types with their Kryo Serializer classes. */ public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> getRegisteredTypesWithKryoSerializerClasses(); /** Returns the registered Kryo Serializer classes. */ public LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> getDefaultKryoSerializerClasses(); /** Returns the registered Kryo types. */ public LinkedHashSet<Class<?>> getKryoTypesgetRegisteredKryoTypes(); /** Returns the registered POJO types. */ public LinkedHashSet<Class<?>> getPojoTypesgetRegisteredPojoTypes(); /** Returns the registered type info factories. */ public LinkedHashMap<Class<Map<Class<?>, Class<TypeInfoFactory<Class<? extends TypeInfoFactory<?>>> getTypeInfoFactoriesgetRegisteredTypeInfoFactories(); /** * Checks whether generic types are supported. Generic types are types that go through Kryo * during serialization. * * <p>Generic types are enabled by default. * * @see #enableGenericTypes() * @see #disableGenericTypes() */ public boolean hasGenericTypesDisabled(); /** Returns whether the Apache Avro is the serializer for POJOs. */ public boolean isForceAvroEnabled(); /** Returns whether kryo is the serialzier for POJOs. */ public boolean isForceKryoEnabled(); } |
The relevant method in TypeInformation
Method | Annotation |
org.apache.flink.api.common.typeinfo#createSerializer(SerializerConfig config) | @PublicEvolving |
Disable Kryo By Default
Currently, option pipeline.generic-types
in Flink is used to enable/disable Kryo
serialzier, its default value is true
which means Flink will use Kryo
as fallback serializer. This causes Kryo to be used by default to serialize some user-defined data types without the user being aware of it, which can lead to incompatibility between the new and old versions of the job state during version upgrades. To avoid this issue, in Flink-2.0
, the default value of pipeline.generic-types
can be changed to false
to prevent the useage usage of the Kryo serializer without the user's knowledge.
...
Create Data Serializer
When user users create a Flink job, TypeExtractor
will choose an appropriate serializer for data type and create TypeInformation
. After supporting the above data types and serializer configurations, it follows the following priority when Flink needs to create a serializer and TypeInformation
for a data type, it follows the following priority.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
The data types and serializers in the configuration file have the highest priority. For data types that are not in the configuration, Flink traverses and creates serializers for TypeInfo
annotation, basic data, Tuple
, composite composited data, Pojo
and generic data types in turn. Besides current built-in serializers, this FLIP will add more serializers for composited data types List
, Map
, Set
and so on. Flink has TypeInformation
for these data types like MapTypeInfo
, ListTypeInfo
and MultisetTypeInfo
, however, TypeExtractor
cannot recognize the composited data types and use these serializers. We would like to support them in this FLIP.
...
The Serializers
calls this method to register Avro
data types with the kryo
serializer through the ExecutionConfig#registerTypeWithKryoSerializer
and ExecutionConfig#addDefaultKryoSerializer
methods. The uppermost entry point for this method is in the ExecutionEnvironment
, mainly for use by the which is used by DataSet
API to create jobs. Since DataSet
will no longer be used and will be removed in the future, this method in AvroUtils
can be marked as Deprecated
and removed in flink-2.0
.
...
KryoSerializer
registers serializer for GenericData.Array
in avro to kryo
registration as follows
a) If flink-avro
exists in job classpath, it automatically creates AvroKryoSerializerUtils
and registers GenericData.Array
and its serializer to kryo serializer.
b) When If flink-avro
is not in the job classpath, a default DefaultAvroUtils
is created and a dummy serializer is registered. As mentioned above, we can add a pipeline.force-kryo-avro
option with a default value of false. Only when the option is set to true, flink will lookup flink-avro
module and register the Avro serializer in KryoSerializer
.
...
The default value of the Flink job option pipeline.force-avro
is false. When users set it to true, they need to add flink-avro
to the job classpath. Then, Pojo
data will will use Avro
serializers, and PojoTypeInfo
will create an AvroSerializer
instance for the data type. This part of the processing can remain unchanged.
...