Status

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/902my7twsy0zo2d8bqw2sm59k2dpmqvy
Vote threadhttps://lists.apache.org/thread/5wylbxworyjfqnd9vtt3p3o3ntws121n
JIRA

Unable to render Jira issues macro, execution error.

Release1.15

Motivation

Currently, Flink processes (JobManager and TaskManager) can only persist information via Flink's HA services. The stored information is usually cluster-scoped, meaning that it is currently not easily possible to store process specific information. Moreover, for storing large blobs Flink currently needs to go to a remote object store from which it needs to download these blobs when restarting. If the Flink processes were restarted on the same node or on a node where the same volume was mounted to, then we could simply use the local disk/mounted volume to store process specific information. For example, we could use this volume to store local state. This would allow us to use local state when recovering from a process failure. Flink could also store the cached blobs on this volume which would avoid having to download them again from the BlobServer.

Since we cannot be sure that a used volume can be remounted or that Flink gets redeployed on the same node, we can use this storage only as a cache for information. This means that there must always be another place from where we can retrieve the required information in case that it is no longer locally cached.

Proposed Changes

We propose to introduce a working directory for Flink processes. This working directory can be used by Flink processes to store instance specific information that might be reusable in case of a process failover. Per default the working directory will be created in the temporary directory of the node Flink is running on. The path will include the ResourceID of the process to make it unique. The user can specify the working directory root to store the working directory on a specific volume.

By assigning a deterministic id for a Flink a process (across restarts), a restarted Flink process will be started with the same working directory.

Given <WORKING_DIRECTORY_BASE> and <RESOURCE_ID>, the effective working directory would be <WORKING_DIRECTORY_BASE>/jm_<RESOURCE_ID> for the JobManager process and <WORKIND_DIRECTORY_BASE>/tm_<RESOURCE_ID>.

Additionally, the effective working directory will contain a tmp directory that will be cleaned whenever the Flink process starts. That way this directory can be used to store temporary information that is only relevant for the lifetime of the process.

Local state

If local state is enabled, then taskmanager.state.local.root-dirs (currently defaulting to tmp directory) should also default to <WORKING_DIRECTORY>/localState. That way one can easily store local state on a mounted volume by configuring the working directory.

RocksDB local directory

RocksDB's local directory, configured via state.backend.rocksdb.localdir, should default to <WORKING_DIRECTORY>/tmp. That way there is no additional configuration required if local state is enabled because both the local state directory and the RocksDB local directory are on the same volume which allows to create hard links for duplicating the checkpoint data.

Public Interfaces

In order to configure the working directory base, we propose to introduce the process.working-dir, process.jobmanager.working-dir and process.taskmanager.working-dir configuration options. Per default, process.working-dir will default to System.getProperty("java.io.tmpdir") and the process specific configuration options will default to process.working-dir if not configured.

Compatibility, Deprecation, and Migration Plan

  • We don't expect this change to affect the default behaviour of Flink because per default some directories will now live under /tmp/xy_<RESOURCE_ID> instead of directly under /tmp. This should not make a big difference.
  • What changes is that we no longer support multiple different tmp directories from which one picks randomly for storing local state.

Test Plan

  • The changes need to be covered by unit and integration tests

Follow ups

Blob server

With the working directory in place, we can let the blob.storage.directory default to <WORKING_DIRECTORY>/blobStorage. If we remove the shutdown hook from BlobServer that deletes this directory upon termination, Flink will be able to reuse cached blobs. We might have to add logic that searches for orphaned blobs and cleans them up in order to avoid cluttering our disks.

Additionally, we probably have to add logic that can detect aborted/corrupted blob uploads and removes them. This has to work for the BlobServer as well as for the BlobStore. So far this was never a problem because every process restart would use a fresh blob storage directory for the BlobServer and overwrite any blobs stored in the BlobStore.

Luckily we can use the already existing MessageDigest for checking for corrupted files. However, we might have to add logic that checks this value before accessing blobs in the BlobServer and BlobStore and invalidates corrupted entries.

Rejected Alternatives