Motivation

Upgrade the Kryo library used by the Flink project from the legacy Kryo 2.24.0 to the current version, which is currently Kryo 5.5.0.

One motivation for this upgrade is Java 17 and 21 support. Moving off of Kryo 2.x onto either Kryo 5.x or something else is probably a prerequisite for adopting Java 17. Kryo 2.24.0 is not compatible with Java 17/21. Kryo 2.x doesn't support Java records at all including records embedded in other Java object graphs and data structures. Next, other Kryo 2.x code will fail at runtime: for example the ArraysAsListSerializer from chill-java fails why the Kryo 5.x version works on Java 17. Some fixes can be back ported, but overall it's probably more reasonable to just upgrade.

A more broader reason is Flink should be using the new, actively maintained version of Kryo rather than the 10+ year old 2.x branch that stopped getting any updates almost ten years ago. Kryo has had lots of improvements over the past ten years since Kryo 2.24.0. Kryo 5.x has faster runtime performance, more memory efficient serialization, fixed lots of bugs, added functionality, and improved compatibility with newer versions of Java. Additionally, Kryo 5.x will continue getting new improvements in the future that will be fully compatible with existing Kryo 5.x code and serialized data.

Public Interfaces

TypeSerializer<T> changes:

  • Add deserializeWithKeyedBackendVersion to TypeSerializer<T>. This is to let the Kryo deserializer know whether to deserialize with Kryo v2 or Kryo v5. This is referencing the version in KeyedBackendSerializationProxy that is increased from 6 to 7 to reflect the Kryo upgrade.

ExecutionConfig changes. The following Kyro v2 methods are deprecated and replaced with analogous Kryo v5 versions with the PublicEvolving annotation.

  • Two overloaded variations of addDefaultKryoSerializer -> addDefaultKryo5Serializer
  • Two overloaded variations of registerTypeWithKryoSerializer -> registerTypeWithKryo5Serializer
  • registerKryoType -> registerKryo5Type
  • getRegisteredTypesWithKryoSerializers -> getRegisteredTypesWithKryo5Serializers
  • getRegisteredTypesWithKryoSerializerClasses -> getRegisteredTypesWithKryo5SerializerClasses
  • getDefaultKryoSerializers -> getDefaultKryo5Serializers
  • getDefaultKryoSerializerClasses → getDefaultKryo5SerializerClasses 
  • getRegisteredKryoTypes → getRegisteredKryo5Types
  • The SerializableSerializer class is deprecated and replaced by the analogous SerializableKryo5Serializer

PipelineOptions: Add the following to Kryo v5 analogs to existing Kryo v2 equivalents:

  • KRYO5_DEFAULT_SERIALIZERS
  • KRYO5_REGISTERED_CLASSES

On disk binary formats including checkpoints and savepoints will have the following changes:

  • KeyedBackendSerializationProxy version is increased from 6 to 7 for new state.
  • New state will use Kryo v5 serialization in places that would have previously used Kryo v2 serialization.

The legacy Flink KryoSerializer for Kryo v2 has the following small changes:

  • Two constructors were changed from package private to public and tagged with the Internal annotation so they could be used from the new Kryo v5 version of Flink KryoSerliaizer.

Summary of Proposed Changes Included in the Pull Request

The proposed changes are implemented and described in the pull request: https://github.com/apache/flink/pull/22660

This proposes adding the Kryo v5 library dependency and keeping Kryo v2 + Twitter chill dependencies for compatibility with existing serialized state. New serialized state will be written with Kryo v5 and not Kryo v2. Some future version of Flink will eventually drop support for Kryo v2 state.

To summarize code changes:

  • Existing Flink Kryo v2 code is copied to a Kryo v5 variant. Notably the package org.apache.flink.api.java.typeutils.runtime.kryo is copied to org.apache.flink.api.java.typeutils.runtime.kryo5. The code was changed to use Kryo 5.x com.esotericsoftware.kryo.kryo5 and fixes were made to make things work. Some fixes that were backported from newer versions of Kryo to the Kryo v2 Flink code, notably this ( Unable to render Jira issues macro, execution error. ) can be dropped from the Flink Kryo v5 code.
  • the new Flink Kryo v5 KryoSerializer class includes an instance of the Kryo v2 version for deserialization compatibility with older serialized state.
  • All new serialized state is written without Kryo v2. All code paths that would have written state with Kryo v2 code paths are now using Kryo v5 code paths.
  • A new version 7 is added to KeyedBackendSerializationProxy. Older versions are assumed to have any Kryo state serialized with Kryo v2 while newer versions are assumed to have any Kryo state serialized with Kryo v5.

Additional Proposed Changes Not Yet Implemented

I plan to implement these if the Flink project is seriously considering adopting this FLIP:

  • Add clear + informative error messages and fail fast if the code attempts to deserialize Kryo v2 state when running under Java 17+. Kryo v5 state is fully supported under Java 17+, but Kryo v2 is not. This way if someone tries to run Flink and read legacy Kryo v2 state with a Java 17+ runtime, they will get a clear error message explaining why that doesn't work.
  • With the legacy Kryo v2 code, the flink-scala module uses the Twitter Chill (Scala) dependency, the flink-java module uses the Twitter chill-java dependency, and the flink-core module has the base Kryo 2.24.0 dependency. The Flink Kryo code in flink-core uses runtime class loading and reflection to call the Chill code in flink-scala or flink-java. Since Kryo v5 doesn't use the chill library at all, we should be able to merge the Kryo code form flink-java to flink-core and remove the runtime reflection dependency and simplify the code.

Compatibility, Deprecation, and Migration Plan

Users who are using the Kryo v2 APIs in ExecutionConfig including those specifying custom Kryo v2 serializers, will have to use the Kryo v5 API analogs in addition to the Kryo v2 APIs that they are currently using. When all persisted savepoints + checkpoints have been fully migrated, then they can and should remove usage of the Kryo v2 APIs.

All Flink applications currently generate state with Kryo v2. This PR will fully support reading that state and write newer state with Kryo v5. However, some future version of Flink would eventually drop support for Kryo v2 and users will need to make sure all persisted state is upgraded before upgrading to a future version of Flink that drops support for Kryo v2 serialized state. There will be a migration pathway so that all applications can upgrade without losing state. Applications will have to run on a bridge release version that will read their state with Kryo v2 data and write it with Kryo v5 data before upgrading to a future version of Flink that completely drops support for Kryo v2.

When Flink adds support for Java 17 and newer, that would not support compatibility with persisted Kryo v2 based state. Users who want compatibility with persisted Kryo v2 state would have to use a Java 11 runtime until they have run a bridge version of Flink and migrated all persisted state over to Kryo v5 format. New applications and applications that have fully migrated persisted state would be free to upgrade to Java 17.

Test Plan

The PR is passing the test suite included in the Flink CI system. The PR includes new unit tests that cover changes and new functionality. Additionally, I wrote this full Flink application to stress the Kryo upgrade process: https://github.com/kurtostfeld/flink-kryo-upgrade-demo

If the Flink Project is seriously considering adopting this FLIP and PR, I plan to write additional tests that cover more use cases. I also intend to solicit assistance from the Flink mailing lists the form of testing scenarios and people who have non-production applications they would be willing to test with. I suspect there are still use cases with backwards compatibility problems that I haven't uncovered yet from existing test efforts.

Alternatives under Active Consideration and Open For Discussion

Chesnay Schepler says that the presence of Kryo serializers in the execution config is a problem in general. The submitted PR doesn't make that better or worse, it just upgrades Kryo, and starts a migration pathway to remove the older Kryo. If there is consensus for improvements for how to remove Kryo seriailzers from the execution config, and it makes sense to bundle that with this Kryo upgrade effort, and it can be done with a reasonable amount of effort or others are willing to assist, then we should do that. Otherwise, a fallback option, is to just upgrade Kryo for now, and revisit removing Kryo from the execution config in a future FLIP.