Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There were many user request over time around those issues, for example:
https://issues.apache.org/jira/browse/FLINK-6755
https://issues.apache.org/jira/browse/FLINK-12619
https://issues.apache.org/jira/browse/FLINK-24149
All of those issues were in principle motivated by missing incremental savepoints options in the Flink.

Terms definition

native format - the "native" data persistence format of the state backend. For example sst files for RocksDB
canonical format - the "uniform" format that supports switching state backends, as implemented and defined by

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-20976

Proposed Changes

It will be possible to request each savepoint independently (via CLI) to be either in the canonical format (the current behaviour) or the native format. When native format is selected, the default configuration of the checkpoints will be used to determine whether the savepoint should be incremental or not. 

API changes

CLI

Code Block
–type (native/canonical)

`type` option  will be added to the CLI commands for taking savepoints and stopping with savepoints. We are proposing selecting `--type native` as the default value, as it makes more sense in most of the use cases. The only use case for `--type canonical` savepoints seems to be if the user wants to change the used state backend type (for example from HashMap to RocksDB), which is a very rare situation. However it should be noted that this will change the pre-existing behaviour of the system.

REST API

Both commands:

Code Block
/jobs/:jobid/stop
/jobs/:jobid/savepoints

will get a new parameter:

...

Semantic

As defined in FLIP-193, incremental savepoints won’t be allowed to refer to any pre-existing files used in previous checkpoints and Flink won’t be allowed to rely on the existence of any newly created files as part of that incremental savepoint. This is because savepoints are owned by the user, while checkpoints are owned by Flink.

...

The main aim of this proposal is to actually document what we can easily provide, based on the fact that native savepoint and aligned checkpoints would be virtually the same thing. The disadvantage of this proposal is that we would need to document the distinction between aligned and unaligned checkpoints.

API changes

CLI

Code Block
–type (native/canonical)

`type` option  will be added to the CLI commands for taking savepoints and stopping with savepoints. We are proposing selecting `--type native` as the default value, as it makes more sense in most of the use cases. The only use case for `--type canonical` savepoints seems to be if the user wants to change the used state backend type (for example from HashMap to RocksDB), which is a very rare situation. However it should be noted that this will change the pre-existing behaviour of the system.

REST API

Both commands:

Code Block
/jobs/:jobid/stop
/jobs/:jobid/savepoints

will get a new parameter:

Code Block
"type" : {
  “type” : “string” // possible values: {“native”, “canonical”}
}

Code changes

Passing the selected savepoint type from the CLI, through the CheckpointCoordinator down to state backends on the `StreamTask` doesn’t seem to be an issue. 

For native savepoints `StreamTask` will have to select an appropriate `SnapshotStrategy` (like for example `RocksIncrementalSnapshotStrategy`) instead of the `SavepointSnapshotStrategy`. And those strategies will need to support `CheckpointType#SharingFilesStrategy.NO_SHARING`.

Limitations

Only native savepoint would support incremental snapshots. There are no plans for providing incremental savepoints in the canonical format.

Support of the incremental savepoint is subject to configured statebackend providing such feature. For example, without FLIP-151, HashMapStatebackend would not be able to provide incremental savepoints.the default configuration

Compatibility, Deprecation, and Migration Plan

...