Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion thread

Problem 1:

Flink can be set up with retained checkpoints, leaving the last checkpoint when execution is cancelled or terminally fails. New jobs can resume from that checkpoint, left by a previous job.

The new job treats a retained checkpoint like a mix of a savepoint and a checkpoint: It doesn’t assume ownership of that checkpoint, meaning it will not remove (clean up) that checkpoint once the checkpoint is subsumed. But, the new job will build incremental checkpoints on top of that retained checkpoint.

This combination means that the cleanup of the checkpoint is left to the user; from Flink’s perspective, it just stays forever and is never deleted. But especially for incremental checkpoints, it is next to impossible for users and management tools to figure out when they can remove an incremental checkpoint (because some artefacts (files) of the incremental checkpoint may still be in use in the later retained checkpoints).

Problem 2:

https://lists.apache.org/thread/zw2crf0c7t7t4cb5cwcwjpvsb3r1ovz2
Vote threadhttps://lists.apache.org/thread/tpyros2fl0howbtcb3fc54f7b7pjn1fw
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25154

Release1.15


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Problem 1:

Flink can be set up with retained checkpoints, leaving the last checkpoint when execution is cancelled or terminally fails. New jobs can resume from that checkpoint, left by a previous job.

The new job treats a retained checkpoint like a mix of a savepoint and a checkpoint: It doesn’t assume ownership of that checkpoint, meaning it will not remove (clean up) that checkpoint once the checkpoint is subsumed. But, the new job will build incremental checkpoints on top of that retained checkpoint.

This combination means that the cleanup of the checkpoint is left to the user; from Flink’s perspective, it just stays forever and is never deleted. But especially for incremental checkpoints, it is next to impossible for users and management tools to figure out when they can remove an incremental checkpoint (because some artefacts (files) of the incremental checkpoint may still be in use in the later retained checkpoints).

Problem 2:

Savepoints are under user control and (currently) full canonical snapshots. Those two characteristics mean Savepoints are under user control and (currently) full canonical snapshots. Those two characteristics mean that they should generally not be considered for regular failure recovery:

...

We could exclude savepoints from the checkpoint timeline. They are not registered in the CompletedCheckpointStore. Thus, Intermediate Savepoints are not used for recovery and do not commit side effects.

ATTENTION: Savepoints created by "stop-with-savepoint" (non-itermediate savepoints) would still commit side-effects.

The way we The way we achieve this for Intermediate Savepoints is:

...

NOTE (needs to be documented): Only one job should ever claim a single snapshot. It is not safe and not supported to start multiple jobs from the same snapshots in the "claim" mode.

NOTE (needs to be documented as a limitation):Retained checkpoints are stored in a path like: {{<checkpoint_dir>/<job_id>/chk_<x>}}

...

If we do not claim a snapshot from which we are restoring, we should not reference any artefacts from it. The problem occurs only for incremental snapshots, because those are the only ones that reference increments from previous checkpoints. We can solve the issue, by forcing the first checkpoint after a restore to be a “full checkpoint”

NOTE (needs to be documented): Once the first checkpoint has completed successfully, the job won't depend in any way on the snapshot used for restoring. Therefore one can start as many jobs from a single snapshot as they wish.

NOTE: In this context and the entire document when talking about “full checkpoints/snapshots” we mean checkpoints that do not cross reference other snapshots. It does not enforce any particular format. In particular, in the case of RocksDB, it could still be a set of SST files, which is what happens if we enable “incremental checkpoint” in RocksDB. We must make sure that all used files are uploaded independently. Thus if we use the same files that the original snapshot, we must either re-upload or duplicate them.

Pros:

  • Clean ownership
  • Old Job’s checkpoint/savepoint directory can be deleted as soon as the first checkpoint is taken.

Cons:

  • We can delete the artefacts only once the first checkpoint is taken, would be nice to be able to delete it once a job is RUNNING
  • It’s up to the state backends implementation to make sure it creates a “full snapshot” when requested
  • compared to the current state, the first checkpoint would take longer. I can not upload just the diff files, if we restore from an incremental retained checkpoint, but re-upload the base files as well. We can improve the situation here by implementing an optimized API for fast duplicating artefacts
Implementation:

The biggest challenge here is how we want to force the first checkpoint as a full snapshot.

When triggering the first checkpoint we would need to add a “force full snapshot” flag to the CheckpointBarrier. It would be up to the state backend to respect that flag and not use any of the shared increments.

The approach could be easily extended to use a “duplicate” API instead of reuploading artefacts. The “duplicate” would happen on TMs in that case, which is what we most likely want.

Furthermore, as a next step, we could build incremental savepoints on top of that, which would react to the same flag and would not reuse any increments from previous checkpoints.

There is one more caveat that would’ve to be handled internally by state backends. If a state backend can share files not just between checkpoints, but also between Tasks (e.g. in case of rescaling a single file could be used for different key group subranges), a state backend would need to make sure such files are re-uploaded/duplicated from a single Task. 

RocksDB in the current implementation does not have the problem, because in case of rescaling it does not use any of the previous increments (we register shared increments from the restore checkpoints in  `org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation#restorePreviousIncrementalFilesStatus` which is called only for restoring without rescaling).

One option how state backends can decide which task should duplicate/reupload a shared file is to change the `org.apache.flink.runtime.state.KeyedStateHandle#getIntersection` so that it embeds information about which KeyedStateHandle should trigger duplication.

The option would require support from state backends, and we need to decide how we want state backends that are not part of Flink to behave. What kind of backwards compatibility we provide. An option would be to add a method like:

Code Block
languagejava
default boolean supportsForcedFullSnapshots() {
    return false;
}

to the org.apache.flink.runtime.state.Snapshotable interface. If a state backend does not support reacting to the flag, we would throw an exception. All state backends that are part of Flink would override this flag.

Duplicating snapshots artefacts

Duplicating the artefacts (e.g., SST files) can be done by uploading them again, or by asking the checkpoint storage to duplicate them. Various systems have dedicated calls to copy files, which are more efficient than uploading them a second time. POSIX file systems can create hard links, and there is an S3 copy API call, which can supposedly cheaply copy objects when they are in the same bucket.

We could use those kinds of APIs to duplicate artefacts from the snapshot we restored, instead of reuploading them again.

...

of RocksDB, it could still be a set of SST files, which is what happens if we enable “incremental checkpoint” in RocksDB. We must make sure that all used files are uploaded independently. Thus if we use the same files that the original snapshot, we must either re-upload or duplicate them.

Pros:

  • Clean ownership
  • Old Job’s checkpoint/savepoint directory can be deleted as soon as the first checkpoint is taken.

Cons:

  • We can delete the artefacts only once the first checkpoint is taken, would be nice to be able to delete it once a job is RUNNING
  • It’s up to the state backends implementation to make sure it creates a “full snapshot” when requested
  • compared to the current state, the first checkpoint would take longer. I can not upload just the diff files, if we restore from an incremental retained checkpoint, but re-upload the base files as well. We can improve the situation here by implementing an optimized API for fast duplicating artefacts


In this mode users should be safe to delete the initial checkpoint after successfully taking another checkpoint after the restore. In order to reliably say if a checkpoint has completed or not, user can e.g. query the Flink's UI for the number of completed checkpoints or check in the completed checkpoint store (Zookeeper/Kubernetes). Checking for existence of files is not a reliable way to check if a checkpoint succeeded.

Gotcha:

There is still a caveat in case of multiple retained externalized checkpoints. Assume we have retained incremental checkpoints chk-42, chk-43. If we restore from chk-42, it is not enough to wait for a first checkpoint after restoring from chk-42 to remove checkpoint 42, because of the chk-43 which builds on top of 42. However restoring from a checkpoint does not create any additional dependants.

Implementation:

The biggest challenge here is how we want to force the first checkpoint as a full snapshot.

When triggering the first checkpoint we would need to add a “force full snapshot” flag to the CheckpointBarrier. It would be up to the state backend to respect that flag and not use any of the shared increments.

The approach could be easily extended to use a “duplicate” API instead of reuploading artefacts. The “duplicate” would happen on TMs in that case, which is what we most likely want.

Furthermore, as a next step, we could build incremental savepoints on top of that, which would react to the same flag and would not reuse any increments from previous checkpoints.

Forcing full snapshots

There is one more caveat that would’ve to be handled internally by state backends. If a state backend can share files not just between checkpoints, but also between Tasks (e.g. in case of rescaling a single file could be used for different key group subranges), a state backend would need to make sure such files are re-uploaded/duplicated from a single Task. 

RocksDB in the current implementation does not have the problem, because in case of rescaling it does not use any of the previous increments (we register shared increments from the restore checkpoints in  `org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation#restorePreviousIncrementalFilesStatus` which is called only for restoring without rescaling).

One option how state backends can decide which task should duplicate/reupload a shared file is to embed that information in KeyedStateHandle. This could apply to RocksDB if we change how we perform rescaling. An example algorithm is we could e.g randomly choose a key group in a range. A task that gets assigned the key group range that contains this group would be responsible for duplicating the file.
Example implementation:

Code Block
languagejava
    public IncrementalRemoteKeyedStateHandle(
            UUID backendIdentifier,
            KeyGroupRange keyGroupRange,
            long checkpointId,
            Map<StateHandleID, StreamStateHandle> sharedState,
            Map<StateHandleID, StreamStateHandle> privateState,
            StreamStateHandle metaStateHandle) {

           ....
        this.keyGroupThatShouldDuplicate =
                new Random().nextInt(keyGroupRange.getNumberOfKeyGroups())
                        + keyGroupRange.getStartKeyGroup();
    }


The changelog state backend is designed in a way that it can share diff files not just between key group ranges, but also between different operators. Thus the logic of deciding which task should duplicate shared files can not be easily embedded in KeyedStateHandles. We propose two ways how this can be achieved:

  1.  (We would go for this option in the 1st version). Trigger a snapshot of the base state backend in the 1st checkpoint, which induces materializing the changelog. In this approach we could duplicate SST files, but we would not duplicate the diff files.
  2.  Add a hook for logic for computing which task should duplicate the diff files. We would have to do a pass over all states after the state assignment in StateAssignmentOperation 
Backwards compatibility

This mode would require support from state backends, and we need to decide how we want state backends that are not part of Flink to behave. What kind of backwards compatibility we provide. An option would be to add a method like:

Code Block
languagejava
default boolean supportsForcedFullSnapshots() {
    return false;
}

to the org.apache.flink.runtime.state.Snapshotable interface. If a state backend does not support reacting to the flag, we would throw an exception. All state backends that are part of Flink would override this flag.

Duplicating snapshots artefacts

Duplicating the artefacts (e.g., SST files) can be done by uploading them again, or by asking the checkpoint storage to duplicate them. Various systems have dedicated calls to copy files, which are more efficient than uploading them a second time. POSIX file systems can create hard links, and there is an S3 copy API call, which can supposedly cheaply copy objects when they are in the same bucket.

We could use those kinds of APIs to duplicate artefacts from the snapshot we restored, instead of reuploading them again.

We should extend the FileSystem / Checkpoint Storage abstraction to support duplicating the references to artefacts.

Example filesystems that support some kind of DFS side COPY API:

Public Interfaces

REST API

...

Code Block
./bin/flink run \
      --fromSavepoint <savepointPath> \
      --allowNonRestoredState ...
      --restoreMode (claim/(no-claim/...)

Code

org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.java

...