Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/2oscz4k3ckbvlqt54o4mdfb4ssq6s7q8
Vote threadtba
JIRAtba
Releasetba

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

FLIP-194: Introduce the JobResultStore introduced repeatable cleanup on the Dispatcher side: If components like the JobGraphStore failed to clean up the job-related data, it was retried. Initially, the plan was to let the retry logic run forever until the root cause of the issue is resolved. The goal was for Flink to own its data and take care of the cleanup in contrast to handing over the ownership to the user (via log messages) as soon as some artifacts failed to be cleaned up. In the end, the retry logic was made configurable. This enables the user to switch back to the old (pre-1.15) approach or to make Flink try to clean things up only a limited amount of times.

The efforts around FLIP-194 didn't include the CompletedCheckpoint cleanup that is performed by the CheckpointsCleaner in a separate thread. As a consequence, Flink doesn't provide consistent cleanup semantics ( Unable to render Jira issues macro, execution error. ). This FLIP tries to fix this inconsistency.

Interfaces being affected

Public Interfaces

No public interfaces have to be changed. We can use the existing configuration parameters that were introduced as part of FLIP-194.

Other Components

The following components participate in the Checkpoint lifecycle management right now:

  • Checkpoint (interface implemented by PendingCheckpoint, CompletedCheckpoint)
    • Data objects that contain all the relevant metadata defining a Checkpoint
    • markAsDiscarded needs to be called on the main thread
    • Checkpoint.DiscardObject holds all the relevant information for cleaning up the Checkpoint without relying on the main thread
  • DefaultCompletedCheckpointStore
    • Makes the CompletedCheckpoints highly available
    • StateHandleStore implementations are used internally as the abstraction layer for different HA backends (i.e. k8s, ZooKeeper)
  • CheckpointsCleaner
    • Up for discussion: Should the CheckpointCleaner be in charge of triggering the next checkpoint; it feels like CheckpointCoordinator should be in charge of it
    • Handles the deletion of Checkpoint instances
    • markAsDiscarded is called on the main thread
    • Actual cleanup is performed in the ioExecutor
    • Schedules next checkpoint creation trigger (through callback) after cleanup is done
    • See Unable to render Jira issues macro, execution error. for the discussion on why the CheckpointsCleaner was added
  • CheckpointCoordinator
    • Manages Checkpoint creation
  • Scheduler
    • Parent for the three previously mentioned components
    • Scheduler#closeAsync triggers shutdown of CompletedCheckpointStore

Proposed Changes

Repeatable Cleanup in CheckpointsCleaner

Currently, the CheckpointsCleaner is passed around wherever some cleanup needs to be done, i.e. CheckpointsCoordinator, DefaultCompletedCheckpointStore. It tries to delete the underlying data and prints a log message if it fails. We could go ahead and add a retry loop similar to what’s done in the DefaultResourceCleaner implementation utilizing the same configuration parameters. This would allow switching back to the old behavior as well where the CheckpointsCleaner only tries to clean things up one time.

This approach lacks proper failover handling. We would lose the checkpoint information because Checkpoints that are subject to deletion are not stored in the CompletedCheckpointStore. Same applies to PendingCheckpoints.

HA-backed Storage

The DefaultCompletedCheckpointStore (along with the DefaultJobGraphStore) utilizes the StateHandleStore internally. StateHandleStore itself is the abstraction layer for supporting different HA backends (i.e. ZooKeeper and k8s ConfigMaps). They store Serializable objects (like CompletedCheckpoints and JobGraphs) by serializing them to a FileSystem folder and persisting the corresponding file path wrapped in a RetrievableStateHandle instance into the HA backend.

The lifecycle of the RetrievableStateHandle (and the underlying file) are handled by the StateHandleStore right now. This works well for the JobGraph instances: All artifacts (the serialized JobGraph stored on the FileSystem and the serialized reference to that artifact stored in the HA backend) are handled by the corresponding StateHandleStore:

  1. Marking the JobGraph for deletion
  2. Deleting the serialized JobGraph from the FileSystem
  3. Removing the entry from HA backend for good

The mark-for-deletion logic was introduced as part of FLIP-194. Currently, marking an entry for deletion is internal logic of the StateHandleStore. There are no interface methods exposing this functionality to components accessing the StateHandleStore.

For Checkpoints, we have the issue that there are artifacts living outside of the StateHandleStore (namely the checkpoint state folders of PendingCheckpoints or CompletedCheckpoints that are subject to deletion). These artifacts are going to be missed in case of a JobManager failover which makes the deletion of Checkpoints a non-atomic operation.

The following table presents a summary of job-related entities and the owner of their artifacts in Flink 1.16 for the JobGraph and Checkpoints (since both are handled by the StateHandleStore):

Flink Entity

Artifact

Location

Owner1

JobGraph

Serialized JobGraph

FileSystem (HA storage dir)

StateHandleStore

FileSystem reference to serialized JobGraph 

HA backend

StateHandleStore

CompletedCheckpoint (including checkpoints that are marked for deletion)

Checkpoint state folder

FileSystem (checkpoints folder)

CheckpointCoordinator/CheckpointsCleaner

Serialized CompletedCheckpoint

FileSystem (HA storage dir)

StateHandleStore

FileSystem reference to serialized CompletedCheckpoint

HA backend

StateHandleStore

PendingCheckpoint (including pending savepoints)

Checkpoint state folder

FileSystem (checkpoints folder; or some other locations for pending savepoints)

CheckpointCoordinator/CheckpointsCleaner

1 “Owner” in a sense that this component is in charge of cleaning things up.

The DefaultCompletedCheckpointStore maintains an in-memory list of CompletedCheckpoints and a handle to the corresponding HA backend through the StateHandleStore interface. The fact that the artifacts of the CompletedCheckpoints are owned by different components makes it impossible to do an atomic cleanup. Additionally, PendingCheckpoints are handled separately in the CheckpointCoordinator. The same applies to savepoints since they are just PendingCheckpoints with an external location. The following sections will only talk about PendingCheckpoints. But anything that applies to PendingCheckpoints also applies to savepoints that are in the process of creation. The ownership of savepoints is transitioned to the user as soon as the savepoint creation is completed successfully.

Transform CompletedCheckpointStore into a CheckpointStore

This FLIP proposes to transform the CompletedCheckpointStore into a CheckpointStore. The CheckpointStore would be in charge of storing all Checkpoints (PendingCheckpoints and CompletedCheckpoints which are marked for deletion along the regular CompletedCheckpoints). It will store these Checkpoints in memory and maintain the state which is persisted in the HA backend. This enables us to move the cleanup logic for checkpoints into a single location: the CheckpointStore.

Registering a PendingCheckpoint in the CheckpointStore needs to happen before any checkpoint state creation is triggered. This ensures that, in case of a JobManager failover, we’re able to clean up the checkpoint state folder even if it’s empty.

The next two subsections will elaborate about two different approaches to implement a CheckpointStore.

Proposal #1: Use StateHandleStore for all Checkpoints

This approach leverages the existing functionality of the StateHandleStore. The StateHandleStore interface needs to be extended to include the functionality around marking entries for deletion. This would enable the CheckpointStore to mark the Checkpoint for deletion in the HA backend. The deletion of the artifacts can be triggered next. The entry which is marked for deletion is only entirely removed after all artifacts of the checkpoint are successfully discarded.

After a JobManager failover, the CheckpointStore would load all relevant information from the HA backend as it’s done in the Flink 1.16 codebase. The only difference is that more checkpoint information is stored in the ZooKeeper nodes or the k8s ConfigMap. PendingCheckpoints and CompletedCheckpoints that are marked for deletion can be picked up again to retrigger the deletion process.

This approach is the closest to what is currently implemented in Flink 1.16. The flaws of this approach are

  • More state is stored in the HA backend which might grow if cleanup of checkpoints takes too long.
  • The HA backend will have to deal with more writes due to the operations for maintaining PendingCheckpoints and marking CompletedCheckpoints for deletion
Proposal #2: Move lifecycle management for FileSystem-based artifacts out of StateHandleStore

As already mentioned previously, DefaultJobGraphStore and DefaultCompletedCheckpointStore share common functionality through the StateHandleStore which takes care of serializing the Java object (i.e. JobGraph or CompletedCheckpoint) onto the FileSystem and the corresponding pointer into the HA backend. Additionally, the StateHandleStore takes care of removing the serialized version of the Java object from the FileSystem and deleting the corresponding entry from the HA backend.

This proposal is about moving the cleanup logic of the FileSystem-based artifacts out of the StateHandleStore. The StateHandleStore would be only in charge of storing the references to the serialized Java object in the HA backend. The serialized artifacts would be handled by the DefaultJobGraphStore and DefaultCheckpointStore respectively. Therefore, both components would be in charge of cleaning up the corresponding artifacts.

As already stated in the intro of this chapter on transforming the CompletedCheckpointStore into a CheckpointStore, persisting any Checkpoint onto the FileSystem (i.e. into the HA storage folder) will be performed by the CheckpointStore (same goes for the DefaultJobGraphStore). The reference to this serialized version will then be stored in the StateHandleStore if the Checkpoint reaches the complete state.

Failover handling works like that: The DefaultCheckpointStore will load CompletedCheckpoints from the HA backend as it is done in Flink 1.16 codebase. Additionally, it will need to load all serialized Checkpoints from the FileSystem. Any Checkpoint that doesn’t match a CompletedCheckpoint loaded from the HA backend will be subject for removal.

Marking CompletedCheckpoints for deletion is simply done by removing the Checkpoint from the HA backend. PendingCheckpoints aren’t listed in the HA backend, but only in-memory and in the HA storage directory. The in-memory version will be lost after a JobManager failover. Its serialized version only needs to contain a reference to the Checkpoint state’s directory. Loading the serialized version of the PendingCheckpoint enables us to clean this directory up and remove the serialized PendingCheckpoint at the end.

Flaws of this approach are:

  • We’re relying on the strong consistency constraints of the FileSystem backend. A workaround would be to do a final check of the FileSystem before shutting down the CheckpointStore.

Checkpoint Cleanup

In any of the proposed solutions, cleaning up Checkpoints needs to follow the following order:

  1. Mark Checkpoint for deletion
    1. Proposal #1: Mark entry for deletion in HA backend
    2. Proposal #2: Remove entry from HA backend
  2. Delete checkpoint state folder
  3. Delete artifact containing the serialized Java object
  4. (remove entry from HA backend for proposal #1)

Idempotent discardState in StateObject

In order for the Checkpoints cleanup to be repeatable, we have to make sure that the StateObject#discardState method is implemented idempotently. Not all implementations of StateObject follow this contract right now. The contract needs to be updated in the JavaDoc of StateObject#discardState. Additionally, all implementations of this method need to be reviewed. I went through the implementations and collected classes that need to be updated in Unable to render Jira issues macro, execution error. .

CheckpointsCleaner merged into CheckpointStore

The CheckpointsCleaner currently “lives” in the Scheduler component. It is used by the CheckpointCoordinator and during shutdown of the CompletedCheckpointStore. The logic of the CheckpointsCleaner can be moved into the CheckpointStore.

Compatibility, Deprecation, and Migration Plan

In Flink 1.16, cleaning up checkpoints will be tried once. Any error will be communicated to the user through log messages. The new implementation would retry the cleanup until it succeeds. As a consequence, cleaning up checkpoints might cause the Flink cluster’s shutdown to block until all artifacts are deleted (as it’s already done for JobGraph, BlobStore, and other job-related HA data). But the user would be still able to switch back to the old behavior by the Retryable Cleanup configuration parameters.

Test Plan

  • Unit tests:
    • The idempotency of StateObject#discardState implementations
    • The adapted functionality of DefaultJobGraphStore and DefaultCheckpointStore
    • Changed StateHandleStore functionality
  • e2e tests: For FLIP-194, we performed a manual test by changing the permissions in a Minio bucket temporarily. This could actually be implemented in an ITCase where we verify that the relevant objects in the bucket are cleaned up afterwards and the cluster is shutdown properly.

User Feedback

Rejected Alternatives

One of the proposals mentioned above is subject to rejection.