Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Current state: Under DiscussionApproved

Page properties


Discussion threadhttps://lists.apache.org/thread/56px3kvn3tlwpc7sl12kx6notfmk9g7q
Vote threadhttps://lists.apache.org/thread/dn15ndp7x9qz9s8kor1plhohw1jnrlr4
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32070

Release


1. Motivation

Currently, there are many types of checkpoint file in Flink:

...

This FLIP proposes a unified file merging framework to solve the file flood problem during flink checkpoint, which could be applied to all types of checkpoint files, including keyed state, non-keyed state, channel state and changelog state. The primary objective is to minimize file open and close operations to reduce access and modification of metadata in the DFS. The proposed approach enables TM to write several data segments within an opened file, with the written files owned by TM. Ownership of a single checkpoint file is transferred to TM, while JM manages the parent directory of these files.TM leverages checkpoint notification (aborted, subsumed) from JM to manage (delete) specific files. The implementation is straightforward and does not require modifications on the JM side. It introduces a new code path in TM, which can be safely enabled and disabled as needed.


Image RemovedImage Added

Fig.1 Overview of file ownership

...

As Flink stores shared and private states in different paths, file merging is performed separately for each scope. The table below lists the behavior of shared and private states in different aspects. For further information, please consult the following sections.


Granularity of merging

Support merging across checkpoints

Need file copy after failover

Need file copy after restoring from retained checkpoint (a new job)

Shared state

per subtask

yes

no

yes

Private state

per taskmanager

yes

no

no

4.1.1 Merge files within a checkpoint or across checkpoints

...

  • SEGMENTED_WITHIN_CP_BOUNDARY: Only merge checkpoint files from the same checkpoint.
  • SEGMENTED_ACROSS_CP_BOUNDARY: Able to merge checkpoint files across different checkpoints. This option is only supported in some file systems.

Image RemovedImage Added

Fig.2 File visibility when checkpoint finished and two merging modes in time dimension

...

When merging files, there are two levels of granularity to consider: subtask or TM. Generally, merging at the TM level is more effective since there are typically more files to merge. However, when restarting a job, the limitations of merging at the TM level become apparent. During restoration, checkpoint files may still be managed by the new job (see 'Claiming the snapshot' in FLIP-193). Given that TMs are reassigned after restoration, it is difficult to manage physical files that contain data from multiple subtasks scattered across different TMs (as depicted in Fig.3). There is no synchronization mechanism between TMs, making file management in this scenario challenging. One possibility is to make multiple copies of the file and have each TM manage a unique copy, but this would result in significant data transfer over the network.


Image RemovedImage Added

Fig.3 The shared state file management problem when merging files per TM level after job restoring.

...

  • For shared states, files are merged within each subtask.
  • For private states, files are merged within each TM.

4.2 Rescaling and Physical File Lifecycle


Info
titleMerging DSTL files (changelog state files)

DSTL files are a special case of shared states, since these files will no longer be shared after job restore. So merging at TM level does NOT bring in additional network and space overhead. Currently the batch uploading of GIC(FLIP-158) combines small changes within a TM (across subtasks) into a single file. To prevent performance regression of its merging, they are still merged with in each TM.

  • For DSTL files (changelog state files), they are merged within each TM.


4.2 Rescaling and Physical File Lifecycle

Restarting a job with a new configuration is a common operation for Flink users. As previously discussed, for shared states in CLAIM recovery mode, the TMs of the new job must take responsibility for managing the old checkpoint files. As files are merged within each subtask for shared states, a physical file belongs to one subtask. If the user does not change the job's parallelism, the old and new subtasks correspond one-to-one, making it a simple take-over scenario. In this case, the new subtask only manages files from the corresponding old subtask. If the user does change the parallelism, an old physical file will be retained by several new subtasks. To avoid communication between subtasks in file management, multiple copies of old files are created, with each new subtask managing a unique copy. The original files are deleted by the JM when the old checkpoint is subsumed, since no TM (or subtask) will claim ownership of these files. Fig.4 and 5 show the checkpoint file management after restoring the Restarting a job with a new configuration is a common operation for Flink users. As previously discussed, for shared states in CLAIM recovery mode, the TMs of the new job must take responsibility for managing the old checkpoint files. As files are merged within each subtask for shared states, a physical file belongs to one subtask. If the user does not change the job's parallelism, the old and new subtasks correspond one-to-one, making it a simple take-over scenario. In this case, the new subtask only manages files from the corresponding old subtask. If the user does change the parallelism, an old physical file will be retained by several new subtasks. To avoid communication between subtasks in file management, multiple copies of old files are created, with each new subtask managing a unique copy. The original files are deleted by the JM when the old checkpoint is subsumed, since no TM (or subtask) will claim ownership of these files. Fig.4 and 5 show the checkpoint file management after restoring the job without and with a change in parallelism, respectively.


Image RemovedImage Added

Fig.4 Shared state file management after restoring with same parallelism

Image RemovedImage Added

Fig.5 Shared state file management after restoring with a different parallelism

...

In section 4.2, it was noted that sometimes the JM deletes old checkpoint files entirely when a new checkpoint is finished after restoring. To simplify file deletion and file ownership transfer, this FLIP proposes a new physical file layout for checkpoint storage. The directories for exclusive scope, shared scope, and task-owned states remain the same. For shared states, a sub-directory is created in the shared directory for each subtask. And for private states, a taskmanager-owned directory is introduced under the checkpoint storage, and a sub-directory is created in the taskmanager-owned directory for each TM. These sub-directories are not under the chk-x directory because the chk-x directories do not survive across checkpoints, but physical files may contain states from different checkpoints. The difference between shared and private states is due to the different file merging granularities, as discussed in Section 4.1.2. Fig.6 illustrates the newly proposed checkpoint storage file layout.


Image RemovedImage Added
Fig.6 Proposed checkpoint storage layout.

...

In the case of shared states, when a job failover occurs, the subtask-owned sub-directory remains the same. The JM receives the same DirectoryStateHandles and does not delete any directories or files. However, when a user restarts a job manually, the sub-directory for each subtask changes due to a change in job parallelism or checkpoint storage base path. The JM receives different DirectoryStateHandles and deletes the original directories later. Fig.7 and 8 describe these two cases.


Image RemovedImage Added

Fig.7 Subtask-owned folder consistent across job restarts (failover).

...


Image Added

Fig.8 Subtask-owned folder migration after restoring from a retained checkpoint.

...

For private states, each TM claims a new sub-directory and DirectoryStateHandle under the taskmanager-owned path as long as a job restoration occurs. This is because the old private state files will not be included in the new checkpoint, rendering the closed files for private state useless for further file merging.

The sub-directory path contains information of current subtask/TM, which is enough for TMs to determine whether to reuse the old one or migrate to a new one.

  • Path for shared states of each subtask: ${checkpointBaseDir}/shared/subtask-{index}-{parallelism}
  • Path for private states of each task manager: ${checkpointBaseDir}/taskmanager-owned/${tmResourceId}

Creating a sub-directory for each granularity unit is a good plan, as it provides a quick way for JM to delete a batch of unclaimed files. Compared to deleting specified physical files, deleting directories does not require JM to know the complicated relationship between checkpoints and physical files, which is maintained by the TMs. Additionally, deleting the directories as a whole helps to remove leftover files (half-written files in the directory that have not been reported to JM), improving the stability of the system.

...

TM records the reference between checkpoint data and underlying files. On checkpoint subsumption or abortion, TM will delete the no-longer-needed files. The overall process is as Fig.9 shows:Image Removed

Image Added


Fig.9 Overview of the file management

...

To determine which checkpoint data is no longer needed, TM uses a watermark of subsumed checkpoint ID. This approach effectively avoids any issues resulting from the loss of checkpoint notifications. When a checkpoint is removed, the related data segments or state handles are also deleted, and their references to underlying files are released. As a result, TM can safely delete the files that are no longer being used.Image Removed


Image Added

Fig.10 Reference counting between logical and physical files.

...

The JM and TM may fail during a checkpoint, and the file merging framework should properly handle these edge cases.

  • Failover: The new subtask will take over the previously managed sub-directory and delete the un-referenced files in best effort.

...

  • Checkpoint notification message lost: There is a possibility that the notification message from JM to TM may be lost due to some reason. To address this, TM uses a watermark to keep track of the latest checkpoint that utilizes a file or segment, and releases the file or segment when a later checkpoint is subsumed. This ensures that even if notification is lost, the arrival of later messages will still trigger the file cleaning procedure.
  • Job stops before the first checkpoint finishes: If the TM have not reported the DirectoryStateHandle of the sub-directory, the TM will delete the sub-directory on exit. Otherwise, the JM will take care of the sub-directory.

4.9. State ownership comparison with other designs

The FLINK-23342 and its design doc[1] provide several designs of state ownership to overcome the disadvantages of having the state files managed by JM. The state ownership design of this FLIP is pretty much like the option 3 in that doc. The main difference is that this FLIP implies the concept of 'epoch' of that doc in the folder path for each granularity (as described in 4.3). Comparing the JM-TM mixed ownership designs of the FLINK-23342 with this FLIP:



Current Flink

Option 5 from the doc: Split counters

Option 3 from the doc: Epochs

Design of this FLIP

Ownership

JM

Mixed

Mixed

Mixed

Granularity of tracking shared state

File

File

Folder

Folder

Fragility

Bad

Good

Good

Good

Reupload

On abort

-

On epoch change

On job restart by user

Cleanup efficiency

Bad

Medium (files but distributed)

Good (folder at once)

Good (folder at once)

Cleanup reliability

Bad

Bad

Good

Good

RPC overhead

Bad

Good

Good

Good

Dev. time

-

Medium / Bad

Medium

Medium

Invasive change of shared state registry

-

Bad

Good

Very Good


5. Public interfaces and User Cases

Several new configuration options are added:

  • state.checkpoints.file-merging: A configuration option that allows the user to enable or disable the file merging feature.
  • state.checkpoints.file-merging.across-checkpoint-boundary: A configuration option that enables merging of files within a checkpoint or across multiple checkpoints (as discussed in section 4.1.1).
  • state.checkpoints.file-merging.max-file-size: A configuration option that sets a maximum size limit for physical files.
  • state.checkpoints.file-merging.max-file-pool-size: A configuration option that specifies the upper limit of the file pool size for concurrent writing.
  • state.checkpoints.file-merging.max-subtasks-per-file:  A configuration option that specifies the lower limit of the file pool size based on the number of subtasks within each TM (only for merging at TM level).
  • state.checkpoints.file-merging.max-space-amplification: A threshold that triggers a compaction (re-uploading) of files (as discussed in section 4.7).
  • Failover: The new subtask will take over the previously managed sub-directory and delete the un-referenced files in best effort.
  • Checkpoint notification message lost: There is a possibility that the notification message from JM to TM may be lost due to some reason. To address this, TM uses a watermark to keep track of the latest checkpoint that utilizes a file or segment, and releases the file or segment when a later checkpoint is subsumed. This ensures that even if notification is lost, the arrival of later messages will still trigger the file cleaning procedure.

5. Public interfaces and User Cases

Three new configuration options are added:

  • A configuration option that allows the user to enable or disable the file merging feature.
  • A configuration option that enables merging of files within a checkpoint or across multiple checkpoints (as discussed in section 4.1.1).
  • A configuration option that sets a maximum size limit for physical files.

Both forward and backward compatibility are supported, and these options only affect new files, not old ones. If the user enables the merging feature and restores a job from an old checkpoint, the new files will be merged while the old checkpoints remain separate. Conversely, if the user disables the feature and restores the job from a checkpoint with merged files, the new files will remain separate while the old files remain merged. The reason for keeping old files untouched is to save significant data transfer over the network, as file copying is unnecessary.

...

We have implemented a POC (in branch https://github.com/Zakelly/flink/tree/flip306_poc) which only supports merging state in EXCLUSIVE scope (eg. operator state and channel state), and test it on a simple job of word count with a tumbling window in 4 parallelism running in one single TM. We collect and compare the file creation and deletion according to our log, with and without file merging enabled. Only the files in the EXCLUSIVE scope counts. The merging target size is set as 32MB and the job is running for 4 hours before. Here's the result:


No merging

Merge within each checkpoint

Merge across checkpoints

Aligned checkpoint

create

5756

3295

1920

delete

5752

3292

1916

Unaligned checkpoint

create

10222

6081

1940

delete

10214

6077

1932

As shown above, this sample job demonstrates that by merging only the operator state and channel state, over 40% of file creation and deletion operations can be eliminated within each checkpoint. Moreover, when merging across multiple checkpoints, this percentage increases to 88%. Additionally, the number of small files is higher in unaligned checkpoints compared to aligned checkpoints, so our solution performs better when unaligned checkpoints are enabled.

...

11. Rejected Alternatives

None for now.



\[1\] The design doc of FLINK-23342: https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#