Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion

Discussion thread:

https://lists.apache.org/thread/4w36oof8dh28b9f593sgtk21o8qh8qx4

https://lists.apache.org/thread/t0bdkx1161rlbnsf06x0kswb05mch164

Vote thread: Discussion thread: here (<- link to https://mail-archiveslists.apache.org/mod_mbox/flink-dev/)thread/0bh530j5ob11lzj48vpm883sqwgmstp8

JIRA:  here (<- link to  https://issues.apache.org/jira/browse/FLINK-29844XXXX)

Released: 1.17

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Currently, if users use a customized serializer to process data, they have to make sure that the schema compatibility in the old serializer (maybe in Flink library) meets the need as users want. Or else they have to modify TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) of the old serializer. There are no ways for users to specify the compatiblity with the old serializer in the new serializer. It also makes scheme evolution not supported in some scenarios.

For exmaple, if users want to implement a customized serializer which is compatible with ValueSerializer, user have to modify the compatibility logic in ValueSerializer.ValueSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer). (ValueSerializer is final so that it also could not be extended). 

...

Public Interfaces

Add an extra method in TypeSerializer.javaTypeSerializerSnapshot.java and provide a default implementation before removing the deprecated TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer).

Code Block
languagejava
titleTypeSerializerSnapshot.java
linenumberstrue
@PublicEvolving
public interface TypeSerializerSnapshot<T> {
	// Check whether the serializer is compatible with the old one.
    default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot) {
		// Return INCOMPATIBLE as default before removing the deprecated method.
        return TypeSerializerSchemaCompatibility.incompatible();
	}
}

Mark the old method as deprecated and provide a default implementation.

Code Block
languagejava
titleTypeserializerTypeSerializerSnapshot.java
linenumberstrue
@PublicEvolving
public interface TypeSerializerSnapshot<T> {
    @Deprecated         
	default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {   
		// Use new method to resolve before removing this deprecated method.
		return newSerializer.snapshotConfiguration().resolveSerializerSchemaCompatibility(this);       
	} 
}

Make the method abstract after removing the deprecated TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer).

Code Block
languagejava
titleTypeserializerSnapshot.java
linenumberstrue
@PublicEvolving 
public interface TypeSerializerSnapshot<T> {
     abstract class TypeSerializer<T> {
	// Check whether the serializer is compatible with the old one.
    public abstract TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshotoldSerializerSnapshot);
}

Proposed Changes

Because Both TypeSerializer and TypeSerializerSnapshot are is a public interfacesinterface, we have several steps to migrate the logic.

Step 1

...

Code Block
languagejava
titleTypeserializerTypeSerializerSnapshot.java
linenumberstrue
@PublicEvolving
public abstractinterface class TypeSerializer<T>TypeSerializerSnapshot<T> {
	// Check whether the serializer is compatible with the old one.
    publicdefault TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshotoldSerializerSnapshot) {   
		// IncompatibleReturn INCOMPATIBLE as default before removing the deprecated method.
        return TypeSerializerSchemaCompatibility.incompatible(); 
	}

 	@Deprecated         
	return TypeSerializerSchemaCompatibility.incompatible();default TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
  		// Use new method to resolve before removing this deprecated method.
		return newSerializer.snapshotConfiguration().resolveSerializerSchemaCompatibility(this);  
    }  
}

Typeserializer#resolveSchemaCompatibility will return TYPE.INCOMPATIBLE default.

All places where use TypeSerializerSnapshot#resolveSchemaCompatibility to check the compatibility will call Typeserializer#resolveSchemaCompatibility firstly then call the original TypeSerializerSnapshot#resolveSchemaCompatibility.

If the result of Typeserializer#resolveSchemaCompatibility is INCOMPATIBLE, it will continue to use the original logic of TypeSerializerSnapshot#resolveSchemaCompatibility to check, else it will just use the resolving result of Typeserializer#resolveSchemaCompatibility.

  1. Add an extra method (TypeserializeSnapshotr#resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot)) in TypeSerializerSnapshot.java as above, and return INCOMPATIBLE as default.
  2. Mark the original method as deprecated and it will use new method to resolve as default.
  3. Implement the new method for all built-in TypeserializerSnapshots.

For the new customized serializer, users Users could implement their own logic of resolving compatibility in their new serializer and it will be called firstly used by flink. 

For the old customized serializer, if If users haven't implement Typeserializer#resolveSchemaCompatibilityTypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot), all behaviors are still same as before. 

Step 2

It will confuse users there are two similar methods (TypeSerializerSnapshot#resolveSchemaCompatibility and Typeserializer#resolveSchemaCompatibility).

So we propose deprecating TypeSerializerSnapshot#resolveSchemaCompatibility for the long run.

if they implement it and remove their implementation of old method, the new logic will be used by flink as well.

Step 2

After several stable versionIn the step 2, we need to:

  1. Remove TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) and related implementation.
  2. Make all places where use TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) to check the compatibility call Typeserializer#resolveSchemaCompatibility(TypeSerializerSnapshot<T> oldSerializerSnapshot). 
  3. Remove the default implementation of the new method.
  4. Implement the method in all inner Typeserializers
  5. Mark TypeSerializerSnapshot#resolveSchemaCompatibility as deprecated
  6. Make the method abstract, remove the default implementation as below
Code Block
languagejava
titleTypeserializerTypeserializerSnapshot.java
linenumberstrue
@PublicEvolving 
public abstractinterface class TypeSerializer<T>TypeSerializerSnapshot<T> {
	    // Check whether the serializer is compatible with the old one.
    public abstract TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshotoldSerializerSnapshot);
}

Step 3

Remove TypeSerializerSnapshot#resolveSchemaCompatibility and related implementation.

All places where use TypeSerializerSnapshot#resolveSchemaCompatibility to check the compatibility will only call Typeserializer#resolveSchemaCompatibility.

Compatibility, Deprecation, and Migration Plan

...

  1. Existing UT of resolving compatibility could verify whether the old behavior could work as before.
  2. Add extra UT to verify the correctness of customized serializer.
  3. Add ITCase and e2e test case to mock the user behavior.

Rejected Alternatives

Introduce a new method in TypeSerializer

Compred to introducing a new method in TypeSerializer, we could get benefits if we introduce it in TypeSerializerSnapshot:

  1. TypeSerializerSnapshot still owns the responsibility of resolving schema compatibility, TypeSerializer could just pay attention to its serialization and deserialization as before.
  2. It's very convenient to implement it based on current implementation by all information in TypeSerializerSnapshot and tools which is also helpful for users to migrate their external serializer.

Introduce a method without default implementation

Why not:

We should make sure that user jobs still work without modifying any codes before removing the deprecated method.

So we need to have two steps to complete the migration and make sure that the first version will not break changeNone.