Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, if users use a customized serializer to process data, they have to make sure that the schema compatibility in the old serializer (maybe in Flink library) meets the need as users want. Or else they have to modify TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) of the old serializer. There are no ways for users to specify the compatiblity with the old serializer in the new serializer. It also makes scheme evolution not supported in some scenarios.

For exmaple, if users want to implement a customized serializer which is compatible with ValueSerializer, user have to modify the compatibility logic in ValueSerializer.ValueSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer). (ValueSerializer is final so that it also could not be extended). 

...

Public Interfaces

Add an extra method in TypeSerializerTypeSerializerSnapshot.java before removing the deprecated TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer).

Code Block
languagejava
titleTypeserializerTypeSerializerSnapshot.java
linenumberstrue
@PublicEvolving
public abstractinterface class TypeSerializer<T>TypeSerializerSnapshot<T> {
	// Check whether the serializer is compatible with the old one.
    publicdefault TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshotoldSerializerSnapshot) {
		// Use the old logic to resolve.   
		return typeSerializerSnapshot.resolveSchemaCompatibility(this);oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());
}

Mark the old method as deprecated and provide a default implementation.

Code Block
languagejava
titleTypeSerializerSnapshot.java
linenumberstrue
@PublicEvolving
public interface TypeSerializerSnapshot<T> {
    @Deprecated         
	default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
		// Return INCOMPATIBLE as default before removing the deprecated method.
        return TypeSerializerSchemaCompatibility.incompatible();   
    } 
}

Make the method abstract after removing the deprecated TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer).

Code Block
languagejava
titleTypeserializerTypeserializerSnapshot.java
linenumberstrue
@PublicEvolving 
public abstractinterface class TypeSerializer<T>TypeSerializerSnapshot<T> {
	    // Check whether the serializer is compatible with the old one.
    public abstract TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshotoldSerializerSnapshot);
}

Proposed Changes

Because Both TypeSerializer and TypeSerializerSnapshot are public interfaces, we have several steps to migrate the logic.

Step 1

Add an extra method in TypeSerializerTypeSerializerSnapshot.java as below.

Code Block
languagejava
titleTypeserializerTypeSerializerSnapshot.java
linenumberstrue
@PublicEvolving
public abstractinterface class TypeSerializer<T>TypeSerializerSnapshot<T> {
	// Check whether the serializer is compatible with the old one.
    publicdefault TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshotoldSerializerSnapshot) {
		// Use the old logic to resolve.   
		return typeSerializerSnapshotoldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer(this));
    }
}

TypeserializeSnapshotr#resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot) Typeserializer#resolveSchemaCompatibility will call TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) as default implementation.

All places where use TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) to check the compatibility will call Typeserializer#resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot).

Users could implement their own logic of resolving compatibility in their new serializer and it will be used by flink.

If users haven't implement Typeserializer#resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot), all behaviors are same as before. 

...

It will confuse users there are two similar methods (TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) and Typeserializer#resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot)).

So we propose deprecating TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) for the long run.

In the step 2, we need to

  1. Implement the new method in all inner TypeserializersTypeserializerSnapshots
  2. Mark TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) as deprecated and provide a default implementation for it as below.
Code Block
languagejava
titleTypeSerializerSnapshot.java
linenumberstrue
@PublicEvolving
public interface TypeSerializerSnapshot<T> {
    @Deprecated         
	default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
		// Return INCOMPATIBLE as default before removing the deprecated method.
        return TypeSerializerSchemaCompatibility.incompatible();   
    } 
}

Step 3

After several stable version, remove TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) and related implementation.

...

Code Block
languagejava
titleTypeserializerTypeserializerSnapshot.java
linenumberstrue
@PublicEvolving 
public abstractinterface class TypeSerializer<T>TypeSerializerSnapshot<T> {
	    // Check whether the serializer is compatible with the old one.
    public abstract TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshotoldSerializerSnapshot);
}

Compatibility, Deprecation, and Migration Plan

...