Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Motivation

Problem 1:

Flink can be set up with retained checkpoints, leaving the last checkpoint when execution is cancelled or terminally fails. New jobs can resume from that checkpoint, left by a previous job.

The new job treats a retained checkpoint like a mix of a savepoint and a checkpoint: It doesn’t assume ownership of that checkpoint, meaning it will not remove (clean up) that checkpoint once the checkpoint is subsumed. But, the new job will build incremental checkpoints on top of that retained checkpoint.

This combination means that the cleanup of the checkpoint is left to the user; from Flink’s perspective, it just stays forever and is never deleted. But especially for incremental checkpoints, it is next to impossible for users and management tools to figure out when they can remove an incremental checkpoint (because some artefacts (files) of the incremental checkpoint may still be in use in the later retained checkpoints).

Problem 2:

Savepoints are under user control and (currently) full canonical snapshots. Those two characteristics mean that they should generally not be considered for regular failure recovery:

  • They might already be deleted (because they are controlled by the user)
  • They take much longer to recover from due to being a full snapshot in canonical format.

However, currently, if you skip Savepoints for recovery, you get result duplication in the transactional sinks (and in other operators with side effects) because a previously processed and committed part of the stream is re-processed. (An option to skip savepoints was actually dropped in 1.14 for these reasons.)

NOTE: The FLIP does not aim to make savepoints as cheap as checkpoints in regards to creating them and recovering from them. It focuses on solving the ownership issue. We do want to make savepoint cheaper, but this will be part of a separate, follow-up FLIP.

Proposed Changes

The proposal is built around the assumption that ownership should be the only difference between Checkpoints and Savepoints.

Checkpoints

  • Owned by the Flink system
  • They are owned by one and only one job. It should never be used by multiple jobs.
  • Cleaned up when subsumed

⇒ These properties are essential, so Flink can assume full ownership of all artefacts and reference, re-use, and garbage-collect them as needed.

Savepoints

  • Owned by the user, never deleted by Flink
  • Self-contained, meaning not coupled to a job and other shared artefacts. 
  • Immutable, can be freely shared between applications.

⇒ Self-contained means that the Savepoint must not depend on any shared snapshot artefacts from its originating job. The property of being self-contained is vital to support easy cleanup by users and external tools. Savepoints may still exist when the Flink job that created them is no longer running, so their cleanup cannot be tied to the running Flink cluster.

Skipping Savepoints for Recovery

Savepoints are under user control, and therefore they should generally not be considered for regular failure recovery because they might already be deleted.

However, currently, if you skip Savepoints for recovery, you get result duplication in the transactional sinks (and in other operators with side effects) because a previously processed and committed part of the stream is re-processed.

We could exclude savepoints from the checkpoint timeline. They are not registered in the CompletedCheckpointStore. Thus, Intermediate Savepoints are not used for recovery and do not commit side effects.

The way we achieve this for Intermediate Savepoints is:

  • Savepoints are triggered similarly to checkpoints.
  • But when they are complete, we do not send “notifyCheckpointComplete()”.
  • When they are complete, they are not inserted into the CompletedCheckpoingStore.
  • Operators never receive a “restore()” for that snapshot, nor a “notifyCheckpointComplete()”. So they never trigger the committing of side effects.
    To the operators, it looks as if the Savepoint didn’t complete (was aborted).
  • The next regular checkpoint commits the range of side effects since the previous regular checkpoint.

NOTE: It is possible in that solution that if the original job (after taking a savepoint) fails, then it will fall back to a checkpoint prior to the savepoint. However, if we now resume a job from the savepoint, then we might commit transactions that might’ve never happened because of falling back to a checkpoint before the savepoint (assuming non-determinism).

However, we think this a rare corner case and we should not complicate the design (see rejected alternatives, copying out checkpoints as savepoints) because of it. When resuming from intermediate savepoints, you need to ignore side effects (sinks), typically by giving the operators a different name. Otherwise you are writing to the same sink from two jobs that are at different points in the stream. That is not well defined, in general. 

We were also thinking about dropping the state automatically when taking the savepoint. This has the drawback that the sink’s state is lost permanently in such a case. At the same time there might be cases where it actually makes sense to reuse the pending side-effects.

We might add tooling to simplify ignoring such side effects/sinks state at a later point in time. For now we would document this as a gotcha.

Restoring from a retained snapshot (savepoint or retained checkpoint)

How it works now (Flink 1.14)

Savepoints

Taking a Savepoint at the end of one job and restoring it as a Savepoint for the next job is the cleanest thing, semantically.

Pros:

  • Simple to understand
  • The resource that survives the first job (the Savepoint) is easy to clean up (once a checkpoint is taken by the new job)

Cons:

  • Savepoints are more expensive to take (full snapshots in canonical format)
  • Sometimes one doesn’t have a Savepoint (the job was cancelled or failed terminally), so we will always need an additional mechanism that works with checkpoints
  • Restoring is more expensive, mainly because the next checkpoint must be full and cannot incrementally build on the Savepoint.

Retained checkpoints

The new job references the retained checkpoint from the old job. It builds an incremental checkpoint on it, but never garbage collects the retained checkpoint because it comes from a different job and checkpoint timeline.

Pros:

  • Fast: New Job can immediately do incremental checkpoints

Cons:

  • Very unclear semantics. A new job depends on artefacts of the old job (references them in incremental checkpoints) but doesn’t assume to own them or clean them up.
  • Users and tools have no way of knowing when it is safe to delete the old checkpoint directory.
  • Results in long lingering unneeded checkpoint artefacts (files) that are never cleaned up.

Proposal (Flink 1.15+)

We suggest introducing two modes of restoring from a retained snapshot (savepoint or checkpoint). (See also the option, at the end for a third one)

Claiming the snapshot

Claiming the checkpoint means being the owner and deleting the checkpoint and its artefacts when they can be removed. In this mode Flink will eventually clean the snapshots as configured for the running job (e.g. on cancel or when subsumed by another checkpoint).

Pros:

  • Fast: New Job can immediately do incremental checkpoints
  • Eventually, retained checkpoint data is cleaned up

Cons:

  • Complex semantics: New job depends on artefacts of the old job (references them in incremental checkpoints).


NOTE (needs to be documented as a limitation):Retained checkpoints are stored in a path like: {{<checkpoint_dir>/<job_id>/chk_<x>}}

At least in the first version we would not take ownership of the <checkpoint_dir>/<job_id> , but only the chk_<x>  directory. Probably we could delete the <checkpoint_dir>/<job_id>  directory as soon as it is empty. It is probably a bad idea to claim a checkpoint from a directory of a running job in the first place. We would most likely be good from that perspective. However, the restore procedure would treat both savepoints and retained checkpoints the same way. As far as I can tell there is nothing to tell those two apart and thus, decide if we should remove the upper directory or not.

Implementation:

When we receive a request to restore from a snapshot, instead of putting the initial snapshot into the CompletedCheckpointStore with flags

(discardSubsumed = false, discardFinished = false, discardCancelled = false, discardFailed = false, discardSuspended = false),

we would use the configuration that was specified for regular checkpoints. (Configured with ExternalizedCheckpointCleanup/“execution.checkpointing.externalized-checkpoint-retention”). 

No-claim mode (default mode)

If we do not claim a snapshot from which we are restoring, we should not reference any artefacts from it. The problem occurs only for incremental snapshots, because those are the only ones that reference increments from previous checkpoints. We can solve the issue, by forcing the first checkpoint after a restore to be a “full checkpoint”

NOTE: In this context and the entire document when talking about “full checkpoints/snapshots” we mean checkpoints that do not cross reference other snapshots. It does not enforce any particular format. In particular, in the case of RocksDB, it could still be a set of SST files, which is what happens if we enable “incremental checkpoint” in RocksDB. We must make sure that all used files are uploaded independently. Thus if we use the same files that the original snapshot, we must either re-upload or duplicate them.

Pros:

  • Clean ownership
  • Old Job’s checkpoint/savepoint directory can be deleted as soon as the first checkpoint is taken.

Cons:

  • We can delete the artefacts only once the first checkpoint is taken, would be nice to be able to delete it once a job is RUNNING
  • It’s up to the state backends implementation to make sure it creates a “full snapshot” when requested
  • compared to the current state, the first checkpoint would take longer. I can not upload just the diff files, if we restore from an incremental retained checkpoint, but re-upload the base files as well. We can improve the situation here by implementing an optimized API for fast duplicating artefacts
Implementation:

The biggest challenge here is how we want to force the first checkpoint as a full snapshot.

When triggering the first checkpoint we would need to add a “force full snapshot” flag to the CheckpointBarrier. It would be up to the state backend to respect that flag and not use any of the shared increments.

The approach could be easily extended to use a “duplicate” API instead of reuploading artefacts. The “duplicate” would happen on TMs in that case, which is what we most likely want.

Furthermore, as a next step, we could build incremental savepoints on top of that, which would react to the same flag and would not reuse any increments from previous checkpoints.

There is one more caveat that would’ve to be handled internally by state backends. If a state backend can share files not just between checkpoints, but also between Tasks (e.g. in case of rescaling a single file could be used for different key group subranges), a state backend would need to make sure such files are re-uploaded/duplicated from a single Task. 

RocksDB in the current implementation does not have the problem, because in case of rescaling it does not use any of the previous increments (we register shared increments from the restore checkpoints in  `org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation#restorePreviousIncrementalFilesStatus` which is called only for restoring without rescaling).

One option how state backends can decide which task should duplicate/reupload a shared file is to change the `org.apache.flink.runtime.state.KeyedStateHandle#getIntersection` so that it embeds information about which KeyedStateHandle should trigger duplication.

The option would require support from state backends, and we need to decide how we want state backends that are not part of Flink to behave. What kind of backwards compatibility we provide. An option would be to add a method like:


Code Block
languagejava
default boolean supportsForcedFullSnapshots() {
    return false;
}


to the org.apache.flink.runtime.state.Snapshotable interface. If a state backend does not support reacting to the flag, we would throw an exception. All state backends that are part of Flink would override this flag.

Duplicating snapshots artefacts

Duplicating the artefacts (e.g., SST files) can be done by uploading them again, or by asking the checkpoint storage to duplicate them. Various systems have dedicated calls to copy files, which are more efficient than uploading them a second time. POSIX file systems can create hard links, and there is an S3 copy API call, which can supposedly cheaply copy objects when they are in the same bucket.

We could use those kinds of APIs to duplicate artefacts from the snapshot we restored, instead of reuploading them again.

We should extend the FileSystem / Checkpoint Storage abstraction to support duplicating the references to artefacts.

Public Interfaces

REST API

/jars/:jarid/run

Code Block
{
   "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunRequestBody",
  "properties" : {
    "allowNonRestoredState" : {
      "type" : "boolean"
    },
    "entryClass" : {
      "type" : "string"
    },
    "jobId" : {
      "type" : "any"
    },
    "parallelism" : {
      "type" : "integer"
    },
    "programArgs" : {
      "type" : "string"
    },
    "programArgsList" : {
      "type" : "array",
      "items" : {
        "type" : "string"
      }
    },
    "savepointPath" : {
      "type" : "string"
    },
    "restoreMode" : {
      “type” : “string” // possible values: {“claim”, “no-claim”}
    }
  }
}

CLI

Code Block
./bin/flink run \
      --fromSavepoint <savepointPath> \
      --allowNonRestoredState ...
      --claim/(no-claim)

Code

org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.java

Code Block
languagejava
/** Savepoint restore settings. */
public class SavepointRestoreSettings implements Serializable {
        public RestoreMode getRestoreMode();
}

org.apache.flink.runtime.jobgraph.RestoreMode

Code Block
languagejava
public enum RestoreMode {
     CLAIM,
    NO_CLAIM;
}

org.apache.flink.runtime.jobgraph.SavepointConfigOptions

Code Block
/** The {@link ConfigOption configuration options} used when restoring from a savepoint. */

@PublicEvolving
public class SavepointConfigOptions {
    public static final ConfigOption<RestoreMode> SAVEPOINT_RESTORE_MODE =
                key("execution.savepoint.restore-mode")
                    .enumType(RestoreMode.class)
                    .defaultValue(RestoreMode.NO_CLAIM);
}

org.apache.flink.runtime.checkpoint.CheckpointCoordinator#restoreSavepoint

Code Block
public boolean restoreSavepoint(

            SavepointRestoreSettings restoreSettings,

            Map<JobVertexID, ExecutionJobVertex> tasks,

            ClassLoader userClassLoader)

            throws Exception {

….

}

org.apache.flink.runtime.checkpoint.CheckpointProperties.java

Code Block
public class CheckpointProperties implements Serializable {
     public boolean isForcedFullSnapshot();
}

org.apache.flink.runtime.checkpoint.CheckpointOptions

Code Block
public class CheckpointOptions implements Serializable {
     public boolean isForcedFullSnapshot();
}

org.apache.flink.core.fs.FileSystem.java

Code Block
public abstract class FileSystem {
     public void duplicate(Path srcPath, Path dstPath) throws IOException {
            // default implementation for backwards compatibility, all implementations should implement their own versions using more performant methods
            final FSDataInputStream src = open(srcPath);
            final FSDataOutputStream dst = create(dstPath, WriteMode.NO_OVERWRITE);
            IOUtils.copyBytes(src, dst, true);
    }
}

Implementation plan

  1. Skipping savepoints for recovery (can be done in parallel)
  2. Claim restore
  3. No claim mode - Implement the “force full snapshot” flag and its handling in state backends. At this point, we would simply reupload previous increments.
  4. Copy/Duplicate API (this can be done in parallel to most of other efforts)
  5. Replace re-uploading shared increments with the duplicate API (depends on 4.)

Compatibility, Deprecation, and Migration Plan

The default behaviour for restoring a retained checkpoint would change. By default when restoring a checkpoint we would take a full first snapshot instead of an incremental one (if restoring from an incremental retained checkpoint). However, this would result in clean ownership semantic. We would never depend on a checkpoint that is not under restored job control.

Rejected alternatives

Copying retained snapshot

Any reference to the previous Job’s checkpoint timeline prevents easy cleanup of those resources, which can be circumvented by copying the snapshot artefacts into the new Job’s timeline.

Pros:

  • Clean semantics, easy to understand
  • Old Job’s checkpoint/savepoint directory can be deleted as soon as a new Job is RUNNING.

Cons:

  • Still Expensive: Copying all snapshot artefacts might be an expensive operation (see also the “Duplicating artefacts” section
  • Unclear where to do the actual copy (see the section below)


The initial copying of the artefacts might be an expensive operation. In case of retained checkpoints, it is balanced out by the possibility to take the first checkpoint after restore as an incremental one. 

This is not possible in the case of savepoints (at least for now) because savepoints are in canonical format and for that reason, they anyhow require the first checkpoint to be a full snapshot. Therefore the only benefit is that we can remove the original savepoint directory as soon as the new job starts.

Where to do the actual copy

The duplicate/copy API can be implemented at three different levels:

  1. bump up counter in e.g. an external key-value store
  2. trigger copy/duplicate action on the filesystem side. This needs to be supported by the filesystem e.g. hardlinks, s3 copy API
  3. download and reupload (HDFS does not support hard links nor has a copy API)

We could consider noth 1 & 2 to be executed from the CheckpointCoordinator as they might be relatively cheap operations (from the point of view of the coordinator). However not necessarily, e.g. S3 copy API might still be too expensive to execute from a single process for 1000s of files. 

Option 3. most certainly would be an expensive operation (depending on the checkpoint size). A natural solution would be to execute the copy/duplicate on the TaskManager side. Unfortunately it has problems on its own.

When restoring from a snapshot, we put a CompletedCheckpoint with a checkpoints metadata into the CompletedCheckpointStore. The problem is we cannot easily create the metadata for the duplicated/copied checkpoint

  1. We could create a placeholder metadata, however it would be invalid until the actual duplication happens. This poses problems with a failover to such a checkpoint
  2. a checkpoint might be assigned to a different number of tasks than originally. In turn, e.g. multiple tasks might read from the same checkpoint file (in a general case). Which task should be responsible for duplicating the file? Is it fine that some of tasks are restored from the original copy?

Given we would need some kind of coordination between tasks and coordination to mark when the duplication finished, the options start to look very similar to the no-claim option. Therefore we decided to drop that option and go with the two listed previously.

Copying out checkpoints as savepoints

Savepoints are under user control, and therefore they should generally not be considered for regular failure recovery because they might already be deleted.

In order to circumvent the issue, we could create a regular checkpoint and copy it out (possibly using the “Duplicating snapshot artefacts API”). That way we would have two independent copies of the same snapshot and we would not depend on the user-owned one.

Pros:

  • we would not have to rewind so much into the past (compared to “skipping savepoints for recovery”)
  • cleaner savepoints side effect semantics
    • we do not keep uncommitted state in savepoints
    • we do not rollback past snapshotted, but uncommitted state
  • we do want to implement native, incremental savepoints. 


The approach of copying out checkpoints as savepoints would let us reuse diff files (the new files created for the savepoint, since the last checkpoint) for a subsequent, incremental checkpoint. However, the benefit of that would be smaller than one could imagine as with frequent checkpoints there should not be many diff files. 

Cons:

  • still, an expensive operation for full snapshots, especially if we do not have a near no-op implementation of the duplicate API
  • more complicated implementation (a single checkpoint, creates two sets of artefacts)

Implementation

In order to support copying checkpoints as savepoints, we could modify state backends slightly to produce two independent sets of artefacts when we trigger a savepoint. We would need to extend the org.apache.flink.runtime.state.Snapshotable interface with a method which would create a pair of StateObjects: one for a checkpoint and one for the savepoint.

Later we would modify the org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint message to contain handles to those two sets of artefacts. CheckpointCoordinator would use one to register it in the CompletedCheckpointStore and the second one to create a savepoint.

Similarly to the no-claim mode option 1) this would require collaboration with state backends implementations.

Using a boolean flag instead of RestoreMode enum

Having just two options: claim vs no-claim we could go with a boolean flag. The main difference would be for the REST API (it would be much easier to change in the code). Therefore we could have the REST API as follows:

Code Block
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunRequestBody",
  "properties" : {
    "allowNonRestoredState" : {
      "type" : "boolean"
    },
    "entryClass" : {
      "type" : "string"
    },
    "jobId" : {
      "type" : "any"
    },
    "parallelism" : {
      "type" : "integer"
    },
    "programArgs" : {
      "type" : "string"
    },
    "programArgsList" : {
      "type" : "array",
      "items" : {
        "type" : "string"
      }
    },
    "savepointPath" : {
      "type" : "string"
    },
    "claim" : {
      “type” : “boolean”
    }
  }
}

However, for the sake of extensibility in the future, I suggest going with the enum.