Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread

...

...

...

JIRA: None create yet

...

thread/5wylbxworyjfqnd9vtt3p3o3ntws121n
JIRA

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25402

Release
1.15


Motivation

Currently, Flink processes (JobManager and TaskManager) can only persist information via Flink's HA services. The stored information is usually cluster-scoped, meaning that it is currently not easily possible to store process specific information. Moreover, for storing large blobs Flink currently needs to go to a remote object store from which it needs to download these blobs when restarting. If the Flink processes were restarted on the same node or on a node where the same volume was mounted to, then we could simply use the local disk/mounted volume to store process specific information. For example, we could use this volume to store local state. This would allow us to use local state when recovering from a process failure. Flink could also store the cached blobs on this volume which would avoid having to download them again from the BlobServer.

...

By assigning a deterministic id for a Flink a process (across restarts), a restarted Flink process will be started with the same working directory.

Given <WORKING_DIRECTORY_BASE> and <RESOURCE_ID>, the effective working directory would be <WORKING_DIRECTORY_BASE>/jm_<RESOURCE_ID> for the JobManager process and <WORKIND_DIRECTORY_BASE>/tm_<RESOURCE_ID>.

Additionally, the effective working directory will contain a tmp directory that will be cleaned whenever the Flink process starts. That way this directory can be used to store temporary information that is only relevant for the lifetime of the process.

Local state

If local state is enabled, then taskmanager.state.local.root-dirs (currently defaulting to tmp directory) should also default to <WORKING_DIRECTORY>/localState. That way one can easily store local state on a mounted volume by configuring the working directory.

RocksDB local directory

RocksDB's local directory, configured via state.backend.rocksdb.localdir, should default to <WORKING_DIRECTORY>/tmp. That way there is no additional configuration required if local state is enabled because both the local state directory and the RocksDB local directory are on the same volume which allows to create hard links for duplicating the checkpoint data.

Public Interfaces

In order to configure the working directory base, we propose to introduce the process.working-dir, process.jobmanager.working-dir and process.taskmanager.working-dir  dir configuration options. Per default, process.working-dir  will  will default to System.getProperty("java.io.tmpdir") and the process specific configuration options will default to process.working-dir if not configured.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

  • We don't expect this change to affect the default behaviour of Flink because per default some directories will now live under /tmp/xy_<RESOURCE_ID> instead of directly under /tmp. This should not make a big difference.
  • What changes is that we no longer support multiple different tmp directories from which one picks randomly for storing local state.

Test Plan

  • The changes need to be covered by unit and integration tests

Follow ups

Blob server

With the working directory in place, we can let the blob.storage.directory default to <WORKING_DIRECTORY>/blobStorage. If we remove the shutdown hook from BlobServer that deletes this directory upon termination, Flink will be able to reuse cached blobs. We might have to add logic that searches for orphaned blobs and cleans them up in order to avoid cluttering our disks.

Additionally, we probably have to add logic that can detect aborted/corrupted blob uploads and removes them. This has to work for the BlobServer as well as for the BlobStore. So far this was never a problem because every process restart would use a fresh blob storage directory for the BlobServer and overwrite any blobs stored in the BlobStore.

Luckily we can use the already existing MessageDigest for checking for corrupted files. However, we might have to add logic that checks this value before accessing blobs in the BlobServer and BlobStore and invalidates corrupted entries.Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives