You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Under Discussion

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: 1.17

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

While restoring from a snapshot, StateBackend will resolve the schema compatibilty to decide where to go.

Currently, if users use a self-defined serializer to process data, they have to make sure that the schema compatibility in the old serializer (maybe in Flink library) meets the need as users want. Or else they have to modify TypeSerializerSnapshot#resolveSchemaCompatibility of the old serializer. There are no ways for users to clarify the compatiblity with the old serializer in the new serializer. It also makes schema evolution not scalable.

For exmaple, if users want to implement a self-defined serializer which is compatible with ValueSerializer, user have to modify the compatibility logic in ValueSerializer.ValueSerializerSnapshot#resolveSchemaCompatibility. (ValueSerializer is final so that it also could not be extended). 

So reversing the direction of resolving schema compatibility could improve the usablitiy of schema evolution.

Public Interfaces

The public interface is the Typerserializer, see below.

Proposed Changes

TypeSerializer.java

Typeserializer.java
@PublicEvolving
public abstract class TypeSerializer<T> {
	// Check whether the serializer is compatible with the old one.
    public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializerSnapshot<T> typeSerializerSnapshot) {
		// Incompatible as default
		return TypeSerializerSchemaCompatibility.incompatible();
    }
}

Typeserializer#resolveSchemaCompatibility will return TYPE.INCOMPATIBLE default.

All places where use TypeSerializerSnapshot#resolveSchemaCompatibility to check the compatibility will call Typeserializer#resolveSchemaCompatibility firstly.

if the result is INCOMPATIBLE, it will continue to use the original logic of TypeSerializerSnapshot#resolveSchemaCompatibility to check.

else it will just use the resolving result of Typeserializer#resolveSchemaCompatibility.

Compatibility, Deprecation, and Migration Plan

If users haven't implement Typeserializer#resolveSchemaCompatibility, all behaviors are same as before. 

Test Plan

  1. Existing UT of resolving compatibility could verify whether the old behavior could work as before.
  2. Add extra UT to verify the correctness of self-defined serializer.
  3. Add ITCase and e2e test case to mock the user behavior.

Rejected Alternatives

None.

  • No labels