...
Current state: Under Discussion
Discussion thread: here (<- link to https://mail-archiveslists.apache.org/thread/4w36oof8dh28b9f593sgtk21o8qh8qx4mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
...
Add an extra method in TypeSerializer.java before removing the deprecated TypeSerializerSnapshot#resolveSchemaCompatibility.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
@PublicEvolving public abstract class TypeSerializer<T> { // Check whether the serializer is compatible with the old one. public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot) { // Incompatible as default Use the old logic to resolve. return TypeSerializerSchemaCompatibilitytypeSerializerSnapshot.incompatibleresolveSchemaCompatibility(this); } } |
Make the method abstract after removing the deprecated TypeSerializerSnapshot#resolveSchemaCompatibility.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
@PublicEvolving
public abstract class TypeSerializer<T> {
// Check whether the serializer is compatible with the old one.
public abstract TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot);
} |
Proposed Changes
Because Both TypeSerializer and TypeSerializerSnapshot are public interfaces, we have several steps to migrate the logic.
...
Add an extra method in TypeSerializer.java as above.below.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
@PublicEvolving
public abstract class TypeSerializer<T> {
// Check whether the serializer is compatible with the old one.
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot) {
// Use the old logic to resolve.
return typeSerializerSnapshot.resolveSchemaCompatibility(this);
}
} |
Typeserializer#resolveSchemaCompatibility will call TypeSerializerSnapshot#resolveSchemaCompatibility as default implementationTypeserializer#resolveSchemaCompatibility will return TYPE.INCOMPATIBLE default.
All places where use TypeSerializerSnapshot#resolveSchemaCompatibility to check the compatibility will call Typeserializer#resolveSchemaCompatibility firstly then call the original TypeSerializerSnapshot#resolveSchemaCompatibility.If the result of Typeserializer#resolveSchemaCompatibility is INCOMPATIBLE, it will continue to use the original logic of TypeSerializerSnapshot#resolveSchemaCompatibility to check, else it will just use the resolving result of Typeserializer#resolveSchemaCompatibility.
Users could implement their own logic of resolving compatibility in their new serializer and it will be called firstly used by flink.
If users haven't implement Typeserializer#resolveSchemaCompatibility, all behaviors are same as before.
...
After several stable version, remove TypeSerializerSnapshot#resolveSchemaCompatibility and related implementation.
At the same time, the interface could be abstract:
Code Block | ||||
---|---|---|---|---|
|
...
| |||
@PublicEvolving
public abstract class TypeSerializer<T> {
// Check whether the serializer is compatible with the old one.
public abstract TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot);
} |
Compatibility, Deprecation, and Migration Plan
...