Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.

Page properties


Current state: Under Discussion

Discussion thread: -


serverASF JIRA



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

This is a follow up on on which this is heavily relaying relying on and as such please check FLIP-193 before for the require context


Canonical format non-incremental savepoints are often prohibitively slow to use for jobs with a larger state. Requirement for a savepoint to use canonical format makes it slow to take and slow to recover from, as all of the state needs to be rewritten to/from the canonical format. Furthermore, this limitation prevents the use of incremental RocksDB snapshots as savepoints, which makes taking savepoints even slower.

There were many user request over time around those issues, for example:
All of those issues were in principle motivated by missing incremental savepoints options in the Flink.

Terms definition

native format - the "native" data persistence format of the state backend. For example sst files for RocksDB
canonical format - the "uniform" format that supports switching state backends, as implemented and defined by

serverASF JIRA

Proposed Changes

It will be possible to request each savepoint independently (via CLI) to be either in the canonical format (the current behaviour) or the native format. When native format is selected, the default configuration of the checkpoints will be used to determine whether the savepoint should be incremental or not. 

API changes


Code Block
–type (native/canonical)

`type` option  will be added to the CLI commands for taking savepoints and stopping with savepoints. We are proposing selecting `--type native` as the default value, as it makes more sense in most of the use cases. The only use case for `--type canonical` savepoints seems to be if the user wants to change the used state backend type (for example from HashMap to RocksDB), which is a very rare situation. However it should be noted that this will change the pre-existing behaviour of the system.


Both commands:

Code Block

will get a new parameter:

Code Block
"type" : {
  “type” : “string” // possible values: {“native”, “canonical”}

`state.backend.incremental` setting will decide the type of native format snapshot and will take effect for both checkpoints and savepoints (with native type).


As defined in FLIP-193, incremental savepoints won’t be allowed to refer to any pre-existing files used in previous checkpoints and Flink won’t be allowed to rely on the existence of any newly created files as part of that incremental savepoint. This is because savepoints are owned by the user, while checkpoints are owned by Flink.


Savepoints in native format should have the same properties as canonical savepoints when it comes to being self-contained and relocatable. All duplicated and newly created files should be located in the user specified target directory and all paths should be encoded as relative paths with respect to the metadata file.

Checkpoint vs savepoint guarantees

As part of this FLIP we would like to finally explicitly document what user can do with checkpoints and savepoints. Currently there are a number of things that are not officially supported, but we know users are doing that. So let's start with the status quo:



Yes (unofficially)/Maybe (untested)

No (but could be done)

Very difficult/impossible

Canonical Savepoint

Aligned Checkpoint

Unaligned Checkpoint

Statebackend change

Self-contained and relocatable

State Processor API(reading)

State Processor API(writing)

Schema evolution



Flink minor (1.x → 1.y) version upgrade


Flink bug/patch (1.14.x → 1.14.y) version upgrade



Arbitrary job upgrade (changed graph shape/record types)

Job upgrade w/o changing graph shape and record types



What about RocksDB upgrades? If we bump RocksDB version between Flink versions, do we support recovering from a native format snapshot (incremental checkpoint)?


Canonical Savepoint

Native Savepoint

Aligned Checkpoint

Unaligned Checkpoint

Statebackend change

Self-contained and relocatable

State Processor API(reading)



State Processor API(writing)

Schema evolution



Flink minor (1.x → 1.y) version upgrade


Flink bug/patch (1.14.x → 1.14.y) version upgrade



Arbitrary job upgrade (changed graph shape/record types)


Job upgrade w/o changing graph shape and record types



The main aim of this proposal is to actually document what we can easily provide, based on the fact that native savepoint and aligned checkpoints would be virtually the same thing. The disadvantage of this proposal is that we would need to document the distinction between aligned and unaligned checkpoints.

API changes


Code Block
–type (native/canonical)

`type` option  will be added to the CLI commands for taking savepoints and stopping with savepoints. We are proposing selecting `--type native` as the default value, as it makes more sense in most of the use cases. The only use case for `--type canonical` savepoints seems to be if the user wants to change the used state backend type (for example from HashMap to RocksDB), which is a very rare situation. However it should be noted that this will change the pre-existing behaviour of the system.


Both commands:

Code Block

will get a new parameter:

Code Block
"type" : {
  “type” : “string” // possible values: {“native”, “canonical”}

Code changes

Passing the selected savepoint type from the CLI, through the CheckpointCoordinator down to state backends on the `StreamTask` doesn’t seem to be an issue. 

For native savepoints `StreamTask` will have to select an appropriate `SnapshotStrategy` (like for example `RocksIncrementalSnapshotStrategy`) instead of the `SavepointSnapshotStrategy`. And those strategies will need to support `CheckpointType#SharingFilesStrategy.NO_SHARING`.


Only native savepoint would support incremental snapshots. There are no plans for providing incremental savepoints in the canonical format.

Support of the incremental savepoint is subject to configured statebackend providing such feature. For example, without FLIP-151, HashMapStatebackend would not be able to provide incremental savepoints.the default configuration

This FLIP-203 does not intend to define guarantees for usage of State Processor API and Schema evolution with native format checkpoints or savepoints. This is intended as a follow up work in the future.

Compatibility, Deprecation, and Migration Plan


  1. In Flink 1.15 `--native` savepoint mode is added, but `--canonical` is kept the default.
  2. In Flink 1.16 `--native` will become the new default.

Rejected Alternatives

Rejected proposal for checkpoint guarantees 

Canonical Savepoint

Native Savepoint

Aligned Checkpoint

Unaligned Checkpoint

Statebackend change

Self-contained and relocatable

State Processor API



Schema evolution




Flink minor (1.x → 1.y) version upgrade

Flink bug/patch (1.14.x → 1.14.y) version upgrade



Arbitrary job upgrade (changed graph shape/record types)

Job upgrade w/o changing graph shape and record types


Main aim of the first proposal was to unify guarantees between two types of savepoints and two types of checkpoints. The only difference between native and canonical savepoint should be the ability to change statebackend, and officially there would be no difference between aligned and unaligned checkpoints. Hence we would simplify the documentation, as we could avoid documenting the distinction between unaligned and aligned checkpoints.

It was rejected because native savepoints and checkpoints are basically the same thing, so there is not much sense in artificially decreasing aligned checkpoint guarantees.