Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread:  here (<- link to https://mail-archiveslists.apache.org/thread/4w36oof8dh28b9f593sgtk21o8qh8qx4mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...

Add an extra method in TypeSerializer.java before removing the deprecated TypeSerializerSnapshot#resolveSchemaCompatibility.

Code Block
languagejava
titleTypeserializer.java
linenumberstrue
@PublicEvolving
public abstract class TypeSerializer<T> {
	// Check whether the serializer is compatible with the old one.
    public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot) {
		// Incompatible as default Use the old logic to resolve.
		return TypeSerializerSchemaCompatibilitytypeSerializerSnapshot.incompatibleresolveSchemaCompatibility(this);
    }
}

Make the method abstract after removing the deprecated TypeSerializerSnapshot#resolveSchemaCompatibility.

Code Block
languagejava
titleTypeserializer.java
linenumberstrue
@PublicEvolving
public abstract class TypeSerializer<T> {
	// Check whether the serializer is compatible with the old one.
    public abstract TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot);
}

Proposed Changes

Because Both TypeSerializer and TypeSerializerSnapshot are public interfaces, we have several steps to migrate the logic.

...

Add an extra method in TypeSerializer.java as above.below.

Code Block
languagejava
titleTypeserializer.java
linenumberstrue
@PublicEvolving
public abstract class TypeSerializer<T> {
	// Check whether the serializer is compatible with the old one.
    public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot) {
		// Use the old logic to resolve.
		return typeSerializerSnapshot.resolveSchemaCompatibility(this);
    }
}

Typeserializer#resolveSchemaCompatibility will call TypeSerializerSnapshot#resolveSchemaCompatibility as default implementationTypeserializer#resolveSchemaCompatibility will return TYPE.INCOMPATIBLE default.

All places where use TypeSerializerSnapshot#resolveSchemaCompatibility to check the compatibility will call Typeserializer#resolveSchemaCompatibility firstly then call the original TypeSerializerSnapshot#resolveSchemaCompatibility.If the result of Typeserializer#resolveSchemaCompatibility is INCOMPATIBLE, it will continue to use the original logic of TypeSerializerSnapshot#resolveSchemaCompatibility to check, else it will just use the resolving result of Typeserializer#resolveSchemaCompatibility.

Users could implement their own logic of resolving compatibility in their new serializer and it will be called firstly used by flink.

If users haven't implement Typeserializer#resolveSchemaCompatibility, all behaviors are same as before. 

...

After several stable version, remove TypeSerializerSnapshot#resolveSchemaCompatibility and related implementation.

At the same time, the interface could be abstract:

Code Block
languagejava
titleTypeserializer.

...

java
linenumberstrue
@PublicEvolving
public abstract class TypeSerializer<T> {
	// Check whether the serializer is compatible with the old one.
    public abstract TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot);
}

Compatibility, Deprecation, and Migration Plan

...