Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Current state: Under DiscussionApproved

Page properties


Discussion threadhttps://lists.apache.org/thread/56px3kvn3tlwpc7sl12kx6notfmk9g7q
Vote threadhttps://lists.apache.org/thread/dn15ndp7x9qz9s8kor1plhohw1jnrlr4
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32070

Release


1. Motivation

Currently, there are many types of checkpoint file in Flink:

...

  • For shared states, files are merged within each subtask.
  • For private states, files are merged within each TM.

4.2 Rescaling and Physical File Lifecycle


Info
titleMerging DSTL files (changelog state files)

DSTL files are a special case of shared states, since these files will no longer be shared after job restore. So merging at TM level does NOT bring in additional network and space overhead. Currently the batch uploading of GIC(FLIP-158) combines small changes within a TM (across subtasks) into a single file. To prevent performance regression of its merging, they are still merged with in each TM.

  • For DSTL files (changelog state files), they are merged within each TM.


4.2 Rescaling and Physical File Lifecycle

Restarting a job with a new configuration is a common operation for Flink users. As previously discussed, for shared states in CLAIM recovery mode, the TMs of the new job must take responsibility for managing the old checkpoint files. As files are merged within each subtask for shared states, a physical file belongs to one subtask. If the user does not change the job's parallelism, the old and new subtasks correspond one-to-one, making it a simple take-over scenario. In this case, the new subtask Restarting a job with a new configuration is a common operation for Flink users. As previously discussed, for shared states in CLAIM recovery mode, the TMs of the new job must take responsibility for managing the old checkpoint files. As files are merged within each subtask for shared states, a physical file belongs to one subtask. If the user does not change the job's parallelism, the old and new subtasks correspond one-to-one, making it a simple take-over scenario. In this case, the new subtask only manages files from the corresponding old subtask. If the user does change the parallelism, an old physical file will be retained by several new subtasks. To avoid communication between subtasks in file management, multiple copies of old files are created, with each new subtask managing a unique copy. The original files are deleted by the JM when the old checkpoint is subsumed, since no TM (or subtask) will claim ownership of these files. Fig.4 and 5 show the checkpoint file management after restoring the job without and with a change in parallelism, respectively.

...

For private states, each TM claims a new sub-directory and DirectoryStateHandle under the taskmanager-owned path as long as a job restoration occurs. This is because the old private state files will not be included in the new checkpoint, rendering the closed files for private state useless for further file merging.

The sub-directory path contains information of current subtask/TM, which is enough for TMs to determine whether to reuse the old one or migrate to a new one.

  • Path for shared states of each subtask: ${checkpointBaseDir}/shared/subtask-{index}-{parallelism}
  • Path for private states of each task manager: ${checkpointBaseDir}/taskmanager-owned/${tmResourceId}

Creating a sub-directory for each Creating a sub-directory for each granularity unit is a good plan, as it provides a quick way for JM to delete a batch of unclaimed files. Compared to deleting specified physical files, deleting directories does not require JM to know the complicated relationship between checkpoints and physical files, which is maintained by the TMs. Additionally, deleting the directories as a whole helps to remove leftover files (half-written files in the directory that have not been reported to JM), improving the stability of the system.

...

The JM and TM may fail during a checkpoint, and the file merging framework should properly handle these edge cases.

  • Failover: The new subtask will take over the previously managed sub-directory and delete the un-referenced files in best effort.
  • Checkpoint notification message lost: There is a possibility that the notification message from JM to TM may be lost due to some reason. To address this, TM uses a watermark to keep track of the latest checkpoint that utilizes a file or segment, and releases the file or segment when a later checkpoint is subsumed. This ensures that even if notification is lost, the arrival of later messages will still trigger the file cleaning procedure.

5. Public interfaces and User Cases

Three new configuration options are added:

  • A configuration option that allows the user to enable or disable the file merging feature.
  • A configuration option that enables merging of files within a checkpoint or across multiple checkpoints (as discussed in section 4.1.1).
  • A configuration option that sets a maximum size limit for physical files.

  • Failover: The new subtask will take over the previously managed sub-directory and delete the un-referenced files in best effort.
  • Checkpoint notification message lost: There is a possibility that the notification message from JM to TM may be lost due to some reason. To address this, TM uses a watermark to keep track of the latest checkpoint that utilizes a file or segment, and releases the file or segment when a later checkpoint is subsumed. This ensures that even if notification is lost, the arrival of later messages will still trigger the file cleaning procedure.
  • Job stops before the first checkpoint finishes: If the TM have not reported the DirectoryStateHandle of the sub-directory, the TM will delete the sub-directory on exit. Otherwise, the JM will take care of the sub-directory.

4.9. State ownership comparison with other designs

The FLINK-23342 and its design doc[1] provide several designs of state ownership to overcome the disadvantages of having the state files managed by JM. The state ownership design of this FLIP is pretty much like the option 3 in that doc. The main difference is that this FLIP implies the concept of 'epoch' of that doc in the folder path for each granularity (as described in 4.3). Comparing the JM-TM mixed ownership designs of the FLINK-23342 with this FLIP:



Current Flink

Option 5 from the doc: Split counters

Option 3 from the doc: Epochs

Design of this FLIP

Ownership

JM

Mixed

Mixed

Mixed

Granularity of tracking shared state

File

File

Folder

Folder

Fragility

Bad

Good

Good

Good

Reupload

On abort

-

On epoch change

On job restart by user

Cleanup efficiency

Bad

Medium (files but distributed)

Good (folder at once)

Good (folder at once)

Cleanup reliability

Bad

Bad

Good

Good

RPC overhead

Bad

Good

Good

Good

Dev. time

-

Medium / Bad

Medium

Medium

Invasive change of shared state registry

-

Bad

Good

Very Good


5. Public interfaces and User Cases

Several new configuration options are added:

  • state.checkpoints.file-merging: A configuration option that allows the user to enable or disable the file merging feature.
  • state.checkpoints.file-merging.across-checkpoint-boundary: A configuration option that enables merging of files within a checkpoint or across multiple checkpoints (as discussed in section 4.1.1).
  • state.checkpoints.file-merging.max-file-size: A configuration option that sets a maximum size limit for physical files.
  • state.checkpoints.file-merging.max-file-pool-size: A configuration option that specifies the upper limit of the file pool size for concurrent writing.
  • state.checkpoints.file-merging.max-subtasks-per-file:  A configuration option that specifies the lower limit of the file pool size based on the number of subtasks within each TM (only for merging at TM level).
  • state.checkpoints.file-merging.max-space-amplification: A threshold that triggers a compaction (re-uploading) of files (as discussed in section 4.7).

Both forward and backward compatibility are supported, and these options only affect new files, not old ones. If the user enables the merging feature and restores a job from an old checkpoint, the new files will be merged while the old checkpoints remain separate. Conversely, if the user disables the feature and restores the job from a checkpoint with merged files, the new files will remain separate while the old files remain merged. The reason for keeping old files untouched is to save significant data transfer over the network, as file copying is unnecessary.

...

11. Rejected Alternatives

None for now.



\[1\] The design doc of FLINK-23342: https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#