Status
...
Page properties | |
---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
...
...
...
JIRA: None create yet
...
|
|
Motivation
Currently, Flink processes (JobManager and TaskManager) can only persist information via Flink's HA services. The stored information is usually cluster-scoped, meaning that it is currently not easily possible to store process specific information. Moreover, for storing large blobs Flink currently needs to go to a remote object store from which it needs to download these blobs when restarting. If the Flink processes were restarted on the same node or on a node where the same volume was mounted to, then we could simply use the local disk/mounted volume to store process specific information. For example, we could use this volume to store local state. This would allow us to use local state when recovering from a process failure. Flink could also store the cached blobs on this volume which would avoid having to download them again from the BlobServer.
...
Additionally, the effective working directory will contain a tmp
directory that will be cleaned whenever the Flink process starts. That way this directory can be used to store temporary information that is only relevant for the lifetime of the process.
...
With the working directory in place, we can let the blob.storage.directory
default to <WORKING_DIRECTORY>/blobStorage
. If we remove the shutdown hook from BlobServer that deletes this directory upon termination, Flink will be able to reuse cached blobs. We might have to add logic that searches for orphaned blobs and cleans them up in order to avoid cluttering our disks.
Local state
If local state is enabled, then taskmanager.state.local.root-dirs (currently defaulting to tmp directory) should also default to <WORKING_DIRECTORY>/localState
. That way one can easily store local state on a mounted volume by configuring the working directory.
...
- The changes need to be covered by unit and integration tests
Follow ups
Blob server
With the working directory in place, we can let the blob.storage.directory
default to <WORKING_DIRECTORY>/blobStorage
. If we remove the shutdown hook from BlobServer that deletes this directory upon termination, Flink will be able to reuse cached blobs. We might have to add logic that searches for orphaned blobs and cleans them up in order to avoid cluttering our disks.
Additionally, we probably have to add logic that can detect aborted/corrupted blob uploads and removes them. This has to work for the BlobServer as well as for the BlobStore. So far this was never a problem because every process restart would use a fresh blob storage directory for the BlobServer and overwrite any blobs stored in the BlobStore.
Luckily we can use the already existing MessageDigest for checking for corrupted files. However, we might have to add logic that checks this value before accessing blobs in the BlobServer and BlobStore and invalidates corrupted entries.