Discussion threadhttps://lists.apache.org/thread/zw2crf0c7t7t4cb5cwcwjpvsb3r1ovz2
Vote threadhttps://lists.apache.org/thread/tpyros2fl0howbtcb3fc54f7b7pjn1fw
JIRA

Unable to render Jira issues macro, execution error.

Release1.15

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Problem 1:

Flink can be set up with retained checkpoints, leaving the last checkpoint when execution is cancelled or terminally fails. New jobs can resume from that checkpoint, left by a previous job.

The new job treats a retained checkpoint like a mix of a savepoint and a checkpoint: It doesn’t assume ownership of that checkpoint, meaning it will not remove (clean up) that checkpoint once the checkpoint is subsumed. But, the new job will build incremental checkpoints on top of that retained checkpoint.

This combination means that the cleanup of the checkpoint is left to the user; from Flink’s perspective, it just stays forever and is never deleted. But especially for incremental checkpoints, it is next to impossible for users and management tools to figure out when they can remove an incremental checkpoint (because some artefacts (files) of the incremental checkpoint may still be in use in the later retained checkpoints).

Problem 2:

Savepoints are under user control and (currently) full canonical snapshots. Those two characteristics mean that they should generally not be considered for regular failure recovery:

  • They might already be deleted (because they are controlled by the user)
  • They take much longer to recover from due to being a full snapshot in canonical format.

However, currently, if you skip Savepoints for recovery, you get result duplication in the transactional sinks (and in other operators with side effects) because a previously processed and committed part of the stream is re-processed. (An option to skip savepoints was actually dropped in 1.14 for these reasons.)

NOTE: The FLIP does not aim to make savepoints as cheap as checkpoints in regards to creating them and recovering from them. It focuses on solving the ownership issue. We do want to make savepoint cheaper, but this will be part of a separate, follow-up FLIP.

Proposed Changes

The proposal is built around the assumption that ownership should be the only difference between Checkpoints and Savepoints.

Checkpoints

  • Owned by the Flink system
  • They are owned by one and only one job. It should never be used by multiple jobs.
  • Cleaned up when subsumed

⇒ These properties are essential, so Flink can assume full ownership of all artefacts and reference, re-use, and garbage-collect them as needed.

Savepoints

  • Owned by the user, never deleted by Flink
  • Self-contained, meaning not coupled to a job and other shared artefacts. 
  • Immutable, can be freely shared between applications.

⇒ Self-contained means that the Savepoint must not depend on any shared snapshot artefacts from its originating job. The property of being self-contained is vital to support easy cleanup by users and external tools. Savepoints may still exist when the Flink job that created them is no longer running, so their cleanup cannot be tied to the running Flink cluster.

Skipping Savepoints for Recovery

Savepoints are under user control, and therefore they should generally not be considered for regular failure recovery because they might already be deleted.

However, currently, if you skip Savepoints for recovery, you get result duplication in the transactional sinks (and in other operators with side effects) because a previously processed and committed part of the stream is re-processed.

We could exclude savepoints from the checkpoint timeline. They are not registered in the CompletedCheckpointStore. Thus, Intermediate Savepoints are not used for recovery and do not commit side effects.

ATTENTION: Savepoints created by "stop-with-savepoint" (non-itermediate savepoints) would still commit side-effects.

The way we achieve this for Intermediate Savepoints is:

  • Savepoints are triggered similarly to checkpoints.
  • But when they are complete, we do not send “notifyCheckpointComplete()”.
  • When they are complete, they are not inserted into the CompletedCheckpoingStore.
  • Operators never receive a “restore()” for that snapshot, nor a “notifyCheckpointComplete()”. So they never trigger the committing of side effects.
    To the operators, it looks as if the Savepoint didn’t complete (was aborted).
  • The next regular checkpoint commits the range of side effects since the previous regular checkpoint.

NOTE: It is possible in that solution that if the original job (after taking a savepoint) fails, then it will fall back to a checkpoint prior to the savepoint. However, if we now resume a job from the savepoint, then we might commit transactions that might’ve never happened because of falling back to a checkpoint before the savepoint (assuming non-determinism).

However, we think this a rare corner case and we should not complicate the design (see rejected alternatives, copying out checkpoints as savepoints) because of it. When resuming from intermediate savepoints, you need to ignore side effects (sinks), typically by giving the operators a different name. Otherwise you are writing to the same sink from two jobs that are at different points in the stream. That is not well defined, in general. 

We were also thinking about dropping the state automatically when taking the savepoint. This has the drawback that the sink’s state is lost permanently in such a case. At the same time there might be cases where it actually makes sense to reuse the pending side-effects.

We might add tooling to simplify ignoring such side effects/sinks state at a later point in time. For now we would document this as a gotcha.

Restoring from a retained snapshot (savepoint or retained checkpoint)

How it works now (Flink 1.14)

Savepoints

Taking a Savepoint at the end of one job and restoring it as a Savepoint for the next job is the cleanest thing, semantically.

Pros:

  • Simple to understand
  • The resource that survives the first job (the Savepoint) is easy to clean up (once a checkpoint is taken by the new job)

Cons:

  • Savepoints are more expensive to take (full snapshots in canonical format)
  • Sometimes one doesn’t have a Savepoint (the job was cancelled or failed terminally), so we will always need an additional mechanism that works with checkpoints
  • Restoring is more expensive, mainly because the next checkpoint must be full and cannot incrementally build on the Savepoint.

Retained checkpoints

The new job references the retained checkpoint from the old job. It builds an incremental checkpoint on it, but never garbage collects the retained checkpoint because it comes from a different job and checkpoint timeline.

Pros:

  • Fast: New Job can immediately do incremental checkpoints

Cons:

  • Very unclear semantics. A new job depends on artefacts of the old job (references them in incremental checkpoints) but doesn’t assume to own them or clean them up.
  • Users and tools have no way of knowing when it is safe to delete the old checkpoint directory.
  • Results in long lingering unneeded checkpoint artefacts (files) that are never cleaned up.

Proposal (Flink 1.15+)

We suggest introducing two modes of restoring from a retained snapshot (savepoint or checkpoint). (See also the option, at the end for a third one)

Claiming the snapshot

Claiming the checkpoint means being the owner and deleting the checkpoint and its artefacts when they can be removed. In this mode Flink will eventually clean the snapshots as configured for the running job (e.g. on cancel or when subsumed by another checkpoint).

Pros:

  • Fast: New Job can immediately do incremental checkpoints
  • Eventually, retained checkpoint data is cleaned up

Cons:

  • Complex semantics: New job depends on artefacts of the old job (references them in incremental checkpoints).


NOTE (needs to be documented): Only one job should ever claim a single snapshot. It is not safe and not supported to start multiple jobs from the same snapshots in the "claim" mode.

NOTE (needs to be documented as a limitation):Retained checkpoints are stored in a path like: {{<checkpoint_dir>/<job_id>/chk_<x>}}

At least in the first version we would not take ownership of the <checkpoint_dir>/<job_id> , but only the chk_<x>  directory. Probably we could delete the <checkpoint_dir>/<job_id>  directory as soon as it is empty. It is probably a bad idea to claim a checkpoint from a directory of a running job in the first place. We would most likely be good from that perspective. However, the restore procedure would treat both savepoints and retained checkpoints the same way. As far as I can tell there is nothing to tell those two apart and thus, decide if we should remove the upper directory or not.

Implementation:

When we receive a request to restore from a snapshot, instead of putting the initial snapshot into the CompletedCheckpointStore with flags

(discardSubsumed = false, discardFinished = false, discardCancelled = false, discardFailed = false, discardSuspended = false),

we would use the configuration that was specified for regular checkpoints. (Configured with ExternalizedCheckpointCleanup/“execution.checkpointing.externalized-checkpoint-retention”). 

No-claim mode (default mode)

If we do not claim a snapshot from which we are restoring, we should not reference any artefacts from it. The problem occurs only for incremental snapshots, because those are the only ones that reference increments from previous checkpoints. We can solve the issue, by forcing the first checkpoint after a restore to be a “full checkpoint”

NOTE (needs to be documented): Once the first checkpoint has completed successfully, the job won't depend in any way on the snapshot used for restoring. Therefore one can start as many jobs from a single snapshot as they wish.

NOTE: In this context and the entire document when talking about “full checkpoints/snapshots” we mean checkpoints that do not cross reference other snapshots. It does not enforce any particular format. In particular, in the case of RocksDB, it could still be a set of SST files, which is what happens if we enable “incremental checkpoint” in RocksDB. We must make sure that all used files are uploaded independently. Thus if we use the same files that the original snapshot, we must either re-upload or duplicate them.

Pros:

  • Clean ownership
  • Old Job’s checkpoint/savepoint directory can be deleted as soon as the first checkpoint is taken.

Cons:

  • We can delete the artefacts only once the first checkpoint is taken, would be nice to be able to delete it once a job is RUNNING
  • It’s up to the state backends implementation to make sure it creates a “full snapshot” when requested
  • compared to the current state, the first checkpoint would take longer. I can not upload just the diff files, if we restore from an incremental retained checkpoint, but re-upload the base files as well. We can improve the situation here by implementing an optimized API for fast duplicating artefacts


In this mode users should be safe to delete the initial checkpoint after successfully taking another checkpoint after the restore. In order to reliably say if a checkpoint has completed or not, user can e.g. query the Flink's UI for the number of completed checkpoints or check in the completed checkpoint store (Zookeeper/Kubernetes). Checking for existence of files is not a reliable way to check if a checkpoint succeeded.

Gotcha:

There is still a caveat in case of multiple retained externalized checkpoints. Assume we have retained incremental checkpoints chk-42, chk-43. If we restore from chk-42, it is not enough to wait for a first checkpoint after restoring from chk-42 to remove checkpoint 42, because of the chk-43 which builds on top of 42. However restoring from a checkpoint does not create any additional dependants.

Implementation:

The biggest challenge here is how we want to force the first checkpoint as a full snapshot.

When triggering the first checkpoint we would need to add a “force full snapshot” flag to the CheckpointBarrier. It would be up to the state backend to respect that flag and not use any of the shared increments.

The approach could be easily extended to use a “duplicate” API instead of reuploading artefacts. The “duplicate” would happen on TMs in that case, which is what we most likely want.

Furthermore, as a next step, we could build incremental savepoints on top of that, which would react to the same flag and would not reuse any increments from previous checkpoints.

Forcing full snapshots

There is one more caveat that would’ve to be handled internally by state backends. If a state backend can share files not just between checkpoints, but also between Tasks (e.g. in case of rescaling a single file could be used for different key group subranges), a state backend would need to make sure such files are re-uploaded/duplicated from a single Task. 

RocksDB in the current implementation does not have the problem, because in case of rescaling it does not use any of the previous increments (we register shared increments from the restore checkpoints in  `org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation#restorePreviousIncrementalFilesStatus` which is called only for restoring without rescaling).

One option how state backends can decide which task should duplicate/reupload a shared file is to embed that information in KeyedStateHandle. This could apply to RocksDB if we change how we perform rescaling. An example algorithm is we could e.g randomly choose a key group in a range. A task that gets assigned the key group range that contains this group would be responsible for duplicating the file.
Example implementation:

    public IncrementalRemoteKeyedStateHandle(
            UUID backendIdentifier,
            KeyGroupRange keyGroupRange,
            long checkpointId,
            Map<StateHandleID, StreamStateHandle> sharedState,
            Map<StateHandleID, StreamStateHandle> privateState,
            StreamStateHandle metaStateHandle) {

           ....
        this.keyGroupThatShouldDuplicate =
                new Random().nextInt(keyGroupRange.getNumberOfKeyGroups())
                        + keyGroupRange.getStartKeyGroup();
    }


The changelog state backend is designed in a way that it can share diff files not just between key group ranges, but also between different operators. Thus the logic of deciding which task should duplicate shared files can not be easily embedded in KeyedStateHandles. We propose two ways how this can be achieved:

  1.  (We would go for this option in the 1st version). Trigger a snapshot of the base state backend in the 1st checkpoint, which induces materializing the changelog. In this approach we could duplicate SST files, but we would not duplicate the diff files.
  2.  Add a hook for logic for computing which task should duplicate the diff files. We would have to do a pass over all states after the state assignment in StateAssignmentOperation 
Backwards compatibility

This mode would require support from state backends, and we need to decide how we want state backends that are not part of Flink to behave. What kind of backwards compatibility we provide. An option would be to add a method like:

default boolean supportsForcedFullSnapshots() {
    return false;
}

to the org.apache.flink.runtime.state.Snapshotable interface. If a state backend does not support reacting to the flag, we would throw an exception. All state backends that are part of Flink would override this flag.

Duplicating snapshots artefacts

Duplicating the artefacts (e.g., SST files) can be done by uploading them again, or by asking the checkpoint storage to duplicate them. Various systems have dedicated calls to copy files, which are more efficient than uploading them a second time. POSIX file systems can create hard links, and there is an S3 copy API call, which can supposedly cheaply copy objects when they are in the same bucket.

We could use those kinds of APIs to duplicate artefacts from the snapshot we restored, instead of reuploading them again.

We should extend the FileSystem / Checkpoint Storage abstraction to support duplicating the references to artefacts.

Example filesystems that support some kind of DFS side COPY API:

Public Interfaces

REST API

/jars/:jarid/run

{
   "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunRequestBody",
  "properties" : {
    "allowNonRestoredState" : {
      "type" : "boolean"
    },
    "entryClass" : {
      "type" : "string"
    },
    "jobId" : {
      "type" : "any"
    },
    "parallelism" : {
      "type" : "integer"
    },
    "programArgs" : {
      "type" : "string"
    },
    "programArgsList" : {
      "type" : "array",
      "items" : {
        "type" : "string"
      }
    },
    "savepointPath" : {
      "type" : "string"
    },
    "restoreMode" : {
      “type” : “string” // possible values: {“claim”, “no-claim”}
    }
  }
}

CLI

./bin/flink run \
      --fromSavepoint <savepointPath> \
      --allowNonRestoredState ...
      --restoreMode (claim/no-claim/...)

Code

org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.java

/** Savepoint restore settings. */
public class SavepointRestoreSettings implements Serializable {
        public RestoreMode getRestoreMode();
}

org.apache.flink.runtime.jobgraph.RestoreMode

public enum RestoreMode {
     CLAIM,
    NO_CLAIM;
}

org.apache.flink.runtime.jobgraph.SavepointConfigOptions

/** The {@link ConfigOption configuration options} used when restoring from a savepoint. */

@PublicEvolving
public class SavepointConfigOptions {
    public static final ConfigOption<RestoreMode> SAVEPOINT_RESTORE_MODE =
                key("execution.savepoint.restore-mode")
                    .enumType(RestoreMode.class)
                    .defaultValue(RestoreMode.NO_CLAIM);
}

org.apache.flink.runtime.checkpoint.CheckpointCoordinator#restoreSavepoint

public boolean restoreSavepoint(

            SavepointRestoreSettings restoreSettings,

            Map<JobVertexID, ExecutionJobVertex> tasks,

            ClassLoader userClassLoader)

            throws Exception {

….

}

org.apache.flink.runtime.checkpoint.CheckpointProperties.java

public class CheckpointProperties implements Serializable {
     public boolean isForcedFullSnapshot();
}

org.apache.flink.runtime.checkpoint.CheckpointOptions

public class CheckpointOptions implements Serializable {
     public boolean isForcedFullSnapshot();
}

org.apache.flink.core.fs.FileSystem.java

public abstract class FileSystem {
     public void duplicate(Path srcPath, Path dstPath) throws IOException {
            // default implementation for backwards compatibility, all implementations should implement their own versions using more performant methods
            final FSDataInputStream src = open(srcPath);
            final FSDataOutputStream dst = create(dstPath, WriteMode.NO_OVERWRITE);
            IOUtils.copyBytes(src, dst, true);
    }
}

Implementation plan

  1. Skipping savepoints for recovery (can be done in parallel)
  2. Claim restore
  3. No claim mode - Implement the “force full snapshot” flag and its handling in state backends. At this point, we would simply reupload previous increments.
  4. Copy/Duplicate API (this can be done in parallel to most of other efforts)
  5. Replace re-uploading shared increments with the duplicate API (depends on 4.)

Compatibility, Deprecation, and Migration Plan

The default behaviour for restoring a retained checkpoint would change. By default when restoring a checkpoint we would take a full first snapshot instead of an incremental one (if restoring from an incremental retained checkpoint). However, this would result in clean ownership semantic. We would never depend on a checkpoint that is not under restored job control.

Rejected alternatives

Copying retained snapshot

Any reference to the previous Job’s checkpoint timeline prevents easy cleanup of those resources, which can be circumvented by copying the snapshot artefacts into the new Job’s timeline.

Pros:

  • Clean semantics, easy to understand
  • Old Job’s checkpoint/savepoint directory can be deleted as soon as a new Job is RUNNING.

Cons:

  • Still Expensive: Copying all snapshot artefacts might be an expensive operation (see also the “Duplicating artefacts” section
  • Unclear where to do the actual copy (see the section below)


The initial copying of the artefacts might be an expensive operation. In case of retained checkpoints, it is balanced out by the possibility to take the first checkpoint after restore as an incremental one. 

This is not possible in the case of savepoints (at least for now) because savepoints are in canonical format and for that reason, they anyhow require the first checkpoint to be a full snapshot. Therefore the only benefit is that we can remove the original savepoint directory as soon as the new job starts.

Where to do the actual copy

The duplicate/copy API can be implemented at three different levels:

  1. bump up counter in e.g. an external key-value store
  2. trigger copy/duplicate action on the filesystem side. This needs to be supported by the filesystem e.g. hardlinks, s3 copy API
  3. download and reupload (HDFS does not support hard links nor has a copy API)

We could consider noth 1 & 2 to be executed from the CheckpointCoordinator as they might be relatively cheap operations (from the point of view of the coordinator). However not necessarily, e.g. S3 copy API might still be too expensive to execute from a single process for 1000s of files. 

Option 3. most certainly would be an expensive operation (depending on the checkpoint size). A natural solution would be to execute the copy/duplicate on the TaskManager side. Unfortunately it has problems on its own.

When restoring from a snapshot, we put a CompletedCheckpoint with a checkpoints metadata into the CompletedCheckpointStore. The problem is we cannot easily create the metadata for the duplicated/copied checkpoint

  1. We could create a placeholder metadata, however it would be invalid until the actual duplication happens. This poses problems with a failover to such a checkpoint
  2. a checkpoint might be assigned to a different number of tasks than originally. In turn, e.g. multiple tasks might read from the same checkpoint file (in a general case). Which task should be responsible for duplicating the file? Is it fine that some of tasks are restored from the original copy?

Given we would need some kind of coordination between tasks and coordination to mark when the duplication finished, the options start to look very similar to the no-claim option. Therefore we decided to drop that option and go with the two listed previously.

Copying out checkpoints as savepoints

Savepoints are under user control, and therefore they should generally not be considered for regular failure recovery because they might already be deleted.

In order to circumvent the issue, we could create a regular checkpoint and copy it out (possibly using the “Duplicating snapshot artefacts API”). That way we would have two independent copies of the same snapshot and we would not depend on the user-owned one.

Pros:

  • we would not have to rewind so much into the past (compared to “skipping savepoints for recovery”)
  • cleaner savepoints side effect semantics
    • we do not keep uncommitted state in savepoints
    • we do not rollback past snapshotted, but uncommitted state
  • we do want to implement native, incremental savepoints. 


The approach of copying out checkpoints as savepoints would let us reuse diff files (the new files created for the savepoint, since the last checkpoint) for a subsequent, incremental checkpoint. However, the benefit of that would be smaller than one could imagine as with frequent checkpoints there should not be many diff files. 

Cons:

  • still, an expensive operation for full snapshots, especially if we do not have a near no-op implementation of the duplicate API
  • more complicated implementation (a single checkpoint, creates two sets of artefacts)

Implementation

In order to support copying checkpoints as savepoints, we could modify state backends slightly to produce two independent sets of artefacts when we trigger a savepoint. We would need to extend the org.apache.flink.runtime.state.Snapshotable interface with a method which would create a pair of StateObjects: one for a checkpoint and one for the savepoint.

Later we would modify the org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint message to contain handles to those two sets of artefacts. CheckpointCoordinator would use one to register it in the CompletedCheckpointStore and the second one to create a savepoint.

Similarly to the no-claim mode option 1) this would require collaboration with state backends implementations.

Using a boolean flag instead of RestoreMode enum

Having just two options: claim vs no-claim we could go with a boolean flag. The main difference would be for the REST API (it would be much easier to change in the code). Therefore we could have the REST API as follows:

{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunRequestBody",
  "properties" : {
    "allowNonRestoredState" : {
      "type" : "boolean"
    },
    "entryClass" : {
      "type" : "string"
    },
    "jobId" : {
      "type" : "any"
    },
    "parallelism" : {
      "type" : "integer"
    },
    "programArgs" : {
      "type" : "string"
    },
    "programArgsList" : {
      "type" : "array",
      "items" : {
        "type" : "string"
      }
    },
    "savepointPath" : {
      "type" : "string"
    },
    "claim" : {
      “type” : “boolean”
    }
  }
}

However, for the sake of extensibility in the future, I suggest going with the enum.

2 Comments

  1. Dawid Wysakowicz , Chesnay Schepler , I found the pictures in this FLIP are broken, can we make it work again?

  2. Yun Tang not perfect answer, but the existing behaviour is pretty well documented here. Maybe that helps?