Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Add an extra method in TypeSerializerSnapshot.java and provide a default implementation before removing the deprecated TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer).

Code Block
languagejava
titleTypeSerializerSnapshot.java
linenumberstrue
@PublicEvolving
public interface TypeSerializerSnapshot<T> {
	// Check whether the serializer is compatible with the old one.
    default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot) {
		// Use Return INCOMPATIBLE as default before removing the old logic to resolve.   
		return oldSerializerSnapshot.resolveSchemaCompatibility(restoreSerializer());deprecated method.
        return TypeSerializerSchemaCompatibility.incompatible();
	}
}

Mark the old method as deprecated and provide a default implementation.

Code Block
languagejava
titleTypeSerializerSnapshot.java
linenumberstrue
@PublicEvolving
public interface TypeSerializerSnapshot<T> {
    @Deprecated         
	default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {   
		// Use Returnnew INCOMPATIBLEmethod asto defaultresolve before removing thethis deprecated method.
        return TypeSerializerSchemaCompatibility.incompatible(		return newSerializer.snapshotConfiguration().resolveSerializerSchemaCompatibility(this);       
    	} 
}

Make the method abstract after removing the deprecated TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer).

...

Because TypeSerializerSnapshot is a public interface, we have several steps to migrate the logic.

Step 1

Add an extra method in TypeSerializerSnapshot.java as below.

Code Block
languagejava
titleTypeSerializerSnapshot.java
linenumberstrue
@PublicEvolving
public interface TypeSerializerSnapshot<T> {
	// Check whether the serializer is compatible with the old one.
    default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot) {   
		// Use the old logic to resolve.    Return INCOMPATIBLE as default before removing the deprecated method.
        return TypeSerializerSchemaCompatibility.incompatible(); 
	}

 	@Deprecated         
	default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
  		// Use new method to resolve before removing this deprecated method.
		return oldSerializerSnapshotnewSerializer.resolveSchemaCompatibilitysnapshotConfiguration(restoreSerializer).resolveSerializerSchemaCompatibility()this);  
    }  
}
  1. Add an extra method (TypeserializeSnapshotr#resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot)

...

  1. ) in TypeSerializerSnapshot.java as above, and return INCOMPATIBLE as default.
  2. Mark the original method as deprecated and it will use new method to resolve as default.
  3. Implement the new method in all inner TypeserializerSnapshots.

For the new customized serializer, users

All places where use TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) to check the compatibility will call Typeserializer#resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot).

Users could implement their own logic of resolving compatibility in their new serializer and it will be used by flink. 

For the old customized serializer, if If users haven't implement TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot), all behaviors are still same as before. 

Step 2

It will confuse users there are two similar methods (TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) and Typeserializer#resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot)).

So we propose deprecating TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) for the long run.

if they implement it and remove their implementation of old method, the new logic will be used by flink as well.

Step 2

After several stable versionIn the step 2, we need to:

  1. Remove
  2. Implement the new method in all inner TypeserializerSnapshots
  3. Mark TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) as deprecated and provide a default implementation for it as below.
Code Block
languagejava
titleTypeSerializerSnapshot.java
linenumberstrue
@PublicEvolving
public interface TypeSerializerSnapshot<T> {
    @Deprecated         
	default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
		// Return INCOMPATIBLE as default before removing the deprecated method.
        return TypeSerializerSchemaCompatibility.incompatible();   
    } 
}

Step 3

...

  1. related implementation.
  2. Make all places where use TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer)

...

  1. to check the compatibility call Typeserializer#resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot). 
  2. Remove the default implementation of the new method.

...

Code Block
languagejava
titleTypeserializerSnapshot.java
linenumberstrue
@PublicEvolving 
public interface TypeSerializerSnapshot<T> {
    // Check whether the serializer is compatible with the old one.
    public abstract TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot);
}

...