You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state: Under Discussion

Discussion thread: -

JIRA: Unable to render Jira issues macro, execution error.

Released: 1.15

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This is a follow up on https://cwiki.apache.org/confluence/display/FLINK/FLIP-193%3A+Snapshots+ownership on which this is heavily relaying and as such please check FLIP-193 before for the require context

Motivation

Canonical format non-incremental savepoints are often prohibitively slow to use for jobs with a larger state. Requirement for a savepoint to use canonical format makes it slow to take and slow to recover from, as all of the state needs to be rewritten to/from the canonical format. Furthermore this limitation prevents the use of incremental RocksDB snapshots as savepoints, which makes taking savepoints even slower.

There were many user request over time around those issues, for example:
https://issues.apache.org/jira/browse/FLINK-6755
https://issues.apache.org/jira/browse/FLINK-12619
https://issues.apache.org/jira/browse/FLINK-24149
All of those issues were in principle motivated by missing incremental savepoints options in the Flink.

Proposed Changes

It will be possible to request each savepoint independently (via CLI) to be either in the canonical format (the current behaviour) or the native format. When native format is selected, the default configuration of the checkpoints will be used to determine whether the savepoint should be incremental or not. 

API changes

CLI

–type (native/canonical)

`type` option  will be added to the CLI commands for taking savepoints and stopping with savepoints. We are proposing selecting `--type native` as the default value, as it makes more sense in most of the use cases. The only use case for `--type canonical` savepoints seems to be if the user wants to change the used state backend type (for example from HashMap to RocksDB), which is a very rare situation. However it should be noted that this will change the pre-existing behaviour of the system.

REST API

Both commands:

/jobs/:jobid/stop
/jobs/:jobid/savepoints

will get a new parameter:

"type" : {
  “type” : “string” // possible values: {“native”, “canonical”}
}

Semantic

As defined in FLIP-193, incremental savepoints won’t be allowed to refer to any pre-existing files used in previous checkpoints and Flink won’t be allowed to rely on the existence of any newly created files as part of that incremental savepoint. This is because savepoints are owned by the user, while checkpoints are owned by Flink.

Incremental savepoints will need to follow a very similar path as the first checkpoint when using the no-claim mode described in the FLIP-193. Pre-existing files from previous checkpoints will need to be duplicated into the savepoint location. State backends will also not be allowed to reference in the future the files that were uploaded as part of that savepoint. This exact mode of operation has already been defined as part of the https://issues.apache.org/jira/browse/FLINK-25192 via `CheckpointType#SharingFilesStrategy.NO_SHARING`. In the scope of this FLIP203 state backends will need to properly implement this mode. 

This means that if the underlying FileSystem doesn’t support fast duplication, incremental savepoints will most likely be still slower compared to incremental checkpoints. However they should be still faster compared to canonical savepoints. And in both cases (incremental or not), native savepoints would be much faster to recover from.

Savepoints in native format should have the same properties as canonical savepoints when it comes to being self-contained and relocatable. All duplicated and newly created files should be located in the user specified target directory and all paths should be encoded as relative paths with respect to the metadata file.

Code changes

Passing the selected savepoint type from the CLI, through the CheckpointCoordinator down to state backends on the `StreamTask` doesn’t seem to be an issue. 

For native savepoints `StreamTask` will have to select an appropriate `SnapshotStrategy` (like for example `RocksIncrementalSnapshotStrategy`) instead of the `SavepointSnapshotStrategy`. And those strategies will need to support `CheckpointType#SharingFilesStrategy.NO_SHARING`.

Compatibility, Deprecation, and Migration Plan

In order to minimise risks of breaking existing deployments and to give users time to migrate, the proposed change of the default behaviour (from canonical to native savepoints) is proposed to happen one release after completing this FLIP document. For example:

  1. In Flink 1.15 `--native` savepoint mode is added, but `--canonical` is kept the default.
  2. In Flink 1.16 `--native` will become the new default.
  • No labels