Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, if users use a self-defined customized serializer to process data, they have to make sure that the schema compatibility in the old serializer (maybe in Flink library) meets the need as users want. Or else they have to modify TypeSerializerSnapshot#resolveSchemaCompatibility of the old serializer. There are no ways for users to clarify specify the compatiblity with the old serializer in the new serializer. It also makes schema scheme evolution not scalablesupported in some scenarios.

For exmaple, if users want to implement a self-defined customized serializer which is compatible with ValueSerializer, user have to modify the compatibility logic in ValueSerializer.ValueSerializerSnapshot#resolveSchemaCompatibility. (ValueSerializer is final so that it also could not be extended). 

So reversing the direction of resolving schema compatibility could improve the usablitiy of schema evolution.

Public Interfaces

The public interface is the Typerserializer, see below.

Proposed Changes

Add an extra method in TypeSerializer.java

Code Block
languagejava
titleTypeserializer.java
linenumberstrue
@PublicEvolving
public abstract class TypeSerializer<T> {
	// Check whether the serializer is compatible with the old one.
    public abstract TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot);
}

Proposed Changes

Because Both TypeSerializer and TypeSerializerSnapshot are public interfaces, we have several steps to migrate the logic.

Step 1

Add an extra method in TypeSerializer.java as below.TypeSerializer.java

Code Block
languagejava
titleTypeserializer.java
linenumberstrue
@PublicEvolving
public abstract class TypeSerializer<T> {
	// Check whether the serializer is compatible with the old one.
    public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot) {
		// Incompatible as default
		return TypeSerializerSchemaCompatibility.incompatible();
    }
}

...

All places where use TypeSerializerSnapshot#resolveSchemaCompatibility to check the compatibility will call Typeserializer#resolveSchemaCompatibility firstly then call the original TypeSerializerSnapshot#resolveSchemaCompatibility.

if If the result of Typeserializer#resolveSchemaCompatibility is INCOMPATIBLE, it will continue to use the original logic of TypeSerializerSnapshot#resolveSchemaCompatibility to check., else it will just use the resolving result of Typeserializer#resolveSchemaCompatibility.

...

Users could implement their own logic of resolving compatibility in their new serializer and it will be called firstly by flink.

If users haven't implement Typeserializer#resolveSchemaCompatibility, all behaviors are same as before. 

Step 2

It will confuse users there are two similar methods (TypeSerializerSnapshot#resolveSchemaCompatibility and Typeserializer#resolveSchemaCompatibility).

So we need to deprecate TypeSerializerSnapshot#resolveSchemaCompatibility.

In the step 2, we need to

  1. Implement the method in all inner Typeserializers
  2. Mark TypeSerializerSnapshot#resolveSchemaCompatibility as deprecated
  3. Make the method abstract, remove the default implementation as below
Code Block
languagejava
titleTypeserializer.java
linenumberstrue
@PublicEvolving
public abstract class TypeSerializer<T> {
	// Check whether the serializer is compatible with the old one.
    public abstract TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot);
}

Step 3

Remove TypeSerializerSnapshot#resolveSchemaCompatibility and related implementation.

Compatibility, Deprecation, and Migration Plan

The deprecation and migration plan are listed above.

Test Plan

  1. Existing UT of resolving compatibility could verify whether the old behavior could work as before.
  2. Add extra UT to verify the correctness of self-defined customized serializer.
  3. Add ITCase and e2e test case to mock the user behavior.

...