Status
Current state: Under Discussion
Discussion thread: https://lists.apache.org/thread/4w36oof8dh28b9f593sgtk21o8qh8qx4
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: 1.17
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
While restoring from a snapshot, StateBackend will resolve the schema compatibilty to decide where to go.
Currently, if users use a customized serializer to process data, they have to make sure that the schema compatibility in the old serializer (maybe in Flink library) meets the need as users want. Or else they have to modify TypeSerializerSnapshot#resolveSchemaCompatibility of the old serializer. There are no ways for users to specify the compatiblity with the old serializer in the new serializer. It also makes scheme evolution not supported in some scenarios.
For exmaple, if users want to implement a customized serializer which is compatible with ValueSerializer, user have to modify the compatibility logic in ValueSerializer.ValueSerializerSnapshot#resolveSchemaCompatibility. (ValueSerializer is final so that it also could not be extended).
So reversing the direction of resolving schema compatibility could improve the usability of schema evolution.
Public Interfaces
Add an extra method in TypeSerializer.java before removing the deprecated TypeSerializerSnapshot#resolveSchemaCompatibility.
@PublicEvolving public abstract class TypeSerializer<T> { // Check whether the serializer is compatible with the old one. public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot) { // Use the old logic to resolve. return typeSerializerSnapshot.resolveSchemaCompatibility(this); } }
Make the method abstract after removing the deprecated TypeSerializerSnapshot#resolveSchemaCompatibility.
@PublicEvolving public abstract class TypeSerializer<T> { // Check whether the serializer is compatible with the old one. public abstract TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot); }
Proposed Changes
Because Both TypeSerializer and TypeSerializerSnapshot are public interfaces, we have several steps to migrate the logic.
Step 1
Add an extra method in TypeSerializer.java as below.
@PublicEvolving public abstract class TypeSerializer<T> { // Check whether the serializer is compatible with the old one. public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot) { // Use the old logic to resolve. return typeSerializerSnapshot.resolveSchemaCompatibility(this); } }
Typeserializer#resolveSchemaCompatibility will call TypeSerializerSnapshot#resolveSchemaCompatibility as default implementation.
All places where use TypeSerializerSnapshot#resolveSchemaCompatibility to check the compatibility will call Typeserializer#resolveSchemaCompatibility.
Users could implement their own logic of resolving compatibility in their new serializer and it will be used by flink.
If users haven't implement Typeserializer#resolveSchemaCompatibility, all behaviors are same as before.
Step 2
It will confuse users there are two similar methods (TypeSerializerSnapshot#resolveSchemaCompatibility and Typeserializer#resolveSchemaCompatibility).
So we propose deprecating TypeSerializerSnapshot#resolveSchemaCompatibility for the long run.
In the step 2, we need to
- Implement the method in all inner Typeserializers
- Mark TypeSerializerSnapshot#resolveSchemaCompatibility as deprecated
Step 3
After several stable version, remove TypeSerializerSnapshot#resolveSchemaCompatibility and related implementation.
At the same time, the interface could be abstract:
@PublicEvolving public abstract class TypeSerializer<T> { // Check whether the serializer is compatible with the old one. public abstract TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot); }
Compatibility, Deprecation, and Migration Plan
The deprecation and migration plan are listed above.
Test Plan
- Existing UT of resolving compatibility could verify whether the old behavior could work as before.
- Add extra UT to verify the correctness of customized serializer.
- Add ITCase and e2e test case to mock the user behavior.
Rejected Alternatives
None.