Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: re-work some details about the paths and recovery/HA

...

Supported BLOB File Types

For now, we'd like to support the following file types to be stored in our BLOB storeWe design the BLOB storage with the following use cases for BLOB files in mind:

  • JarFile: a library, for use by the user-ClassLoader
  • RpcMessage: for (a) off-loading large RPC messages that do not fit into the akka.framesize, (b) distributing a single (large) RPC message, e.g. the TaskDeploymentDescriptor, to multiple recipients by leveraging any underlying distributed file system in HA mode, (c) the ability to re-use a (large) cached RPC message during re-deployment after failures.
  • LogFile: retrieving log files from task managers (currently only used by the Web-UI)

Each may have a different lifecycle to which we will come belowThe maximum life span of any of these types is limited by the life span of the job it belongs to. Files will be ref-counted so that shorter life spans are possible, e.g. for the latter two types.

Move BLOB Ref-counting & Cleanup from the LibraryCacheManager to the BlobCache

...

  • bridge between a task's classloader and cached library BLOBs, i.e. jar files, at the BlobCache

BLOB Lifecycle

In general, the BlobCache should The BlobCache uses ref-count counting of all of its BLOBs and should start starts deleting unreferenced ones files after "blob.retention.interval" seconds. On the BlobServer, we will also use ref-counting and introduce a proper lifecycle for each BLOB file and delete similarly . The <jobId> subdirectory is also ref-counted along with each of its BLOBs and will be deleted similarly. Note that, here, we currently do not have the information that a job entered a final state and thus have to rely on reference counting only for the removal of a job's BLOBs. We may only cleanup remaining files when the TaskManager shuts down.

The BlobServer will use the same ref-counting technique and will delete files from both local and HA storage appropriately. Additionally, it will have a safety-net by using the knowledge that a job entered a final state. In that case, the job's BLOB storage directory (local and HA) will be deleted, i.e. "<path>/<jobId>", including all its BLOB files.

Staged Cleanup

Up to Flink 1.3, at the LibraryCacheManager, we run a periodic cleanup task every hour (see ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL or the "library-cache-manager.cleanup.interval" configuration parameter) which deletes any unreferenced jar files. If a job's task fails right before the cleanup starts, a following recovery could thus not access the cached file anymore. We'd like to change this with a "staged cleanup":

In a staged cleanup, a BLOB file is only deleted at the second time the (periodic) cleanup task encounters this BLOB to be unreferenced, e.g. by having two cleanup lists: one for the actual cleanup, i.e. "delete now", one for staging files, i.e. "delete next time". Each time the cleanup task runs, the actual cleanup list (and files) will be deleted and the staging list becomes the actual list. This is cheaper than having a time-to-live for each unreferenced jar and good enough for the cleanup task at hand. A (currently) unreferenced BLOB will thus stay around at least "blob.retention.interval" seconds and at most twice this amount. As long as it stays, it may be re-used by future (recovery) tasks and does not need to be downloaded again.

Note: since the <jobId> directory is also ref-counted and it is generally cheaper (especially in HA file systems) to delete whole directories rather than each single file, we will adapt the final delete process by sweeping over the list of files/directories to delete first and issuing a directory-delete only in case it is found.

BlobCache

  • all blobs are ref-counted, starting from first BLOB retrieval/delivery
  • the job-specific BLOB sub-directory is also ref-counted with each job-related file
  • if reference = 0 the BLOB enters the staged cleanup (see above)
  • if a task succeeds, fails or is cancelled, all its BLOBs' references are counted down appropriately (we may–at a later point in time--optimise the case when a task enters a final state and delete these files immediately once the reference counter is 0if possible)
  • all blobs should be deleted when the TM exitsBlobCache shuts down, i.e. the TaskManager exits

Note that several tasks running on the same TaskManager may use BLOB files of the same job!

BlobServer

All BLOB files stored at the BlobServer should also be periodically cleaned up and not just when the BlobServer shuts down (as of Flink 1.3). The actual lifecycle of the different BLOB types is slightly different as described below, all based on reference counting and a staged cleanup as in the BlobCache.

JarFile

...

  • all blobs are ref-counted, starting from the initial upload
  • the job-specific BLOB sub-directory ("<path>/<jobId>") is not ref-counted (it may be, but this is not necessary here)
  • if a job fails, all its BLOBs' references are counted down appropriately similarly, once (if possible)
  • if a job enters a final state (, i.e. finished or cancelled), all its BLOB files' references are counted down appropriately (we may optimise this at later point in time and delete these files immediately)During the upload: , the job-specific BLOB subdirectory ("<path>/<jobId>") and all its BLOBs are deleted immediately and are removed from ref-counting (despite their actual ref-count!)
  • if reference = 0 the BLOB enters the staged cleanup (see above)
  • all blobs should be deleted when the BlobServer exits

Note that jar files are mostly deleted along with the job-specific BLOB directory while short-lived BLOBs like RPC messages or logs are deleted mostly based on ref-counting. The job-directory delete acts as a safety-net for them in case the ref-counting is wrong.

BlobCache Download

When a file for a given BlobKey is requested, ...

When files are retrieved, the BlobCache tries to download them from the from the HA store (if available) or from the BlobServer directly. During the transfer, these files will be put into a temporary directory and only submitted to the job-specific path when completely transferred and checksum-verified.

BlobServer Upload

While user jars are being uploaded, the corresponding job is not submitted yet and we cannot bind the jar files to a non-existing job's lifecycle. Also, we cannot upload each file at once and use a ref-counter of 0 each or some files may already be deleted when the job is started. Instead, we will upload all jar files together and only after receiving the last one, we will put all of them into the staging list of the staged cleanup. The job then needs to be submitted within "blob.retention.interval" seconds or we cannot make any guarantees that the jar files still exist. This ensures a proper cleanup during client aborts/crashes between the upload and the job submission

...

RpcMessage

While we can certainly delete RPC message BLOBs after the job enters a final state, in general, RPC messages have a shorter lifetime than jobs. We will thus let the user of this service, e.g. some RPC daemon/wrapper class, actively ask for a number of references when retrieving/uploading the message and when releasing it.

...

This

is similar to the RpcMessage type. We may argue whether to keep logs after a job enters a final state but since they are always still available at the task managers, we will delete unreferenced logs in this case.

BlobServer Upload

...

BlobServer Recovery (HA)

...

  • fetch all jobs to recover
  • download their BLOBs and increase reference counts appropriately (at the JobManager only after successful job submission)
  • delete put any other, i.e. orphaned, files file in the configured storage path into staged cleanup

Compatibility, Deprecation, and Migration Plan

...

We will adapt the current unit tests to any new API to verify that the behaviour of the BLOB storage components did not change. Also, proper unit tests for the different BLOB types will be added. These additions should already cover more than before and since this is close to the back-end, further (integration) tests will indirectly verify that the changes work as expected..

Rejected Alternatives

Flat file system hierarchy

If instead of using "<path>/<jobId>/<BlobKey>", we would use "<path>/<BlobKey>" we may easily share common BLOBs among jobs but, on the BlobServer, will loose a safety net when the job enters a final state. In that case, if the ref-counting is wrong, we could not delete all BLOBs for the job or even decrement the references appropriately.

BLOB-Sharing among different Jobs

The usecase of several jobs sharing the same jar file(s) and an optimisation using only a single manifestation of this file may be covered at a later point in time since we do want to keep it simple this time and develop a proper cleanup story instead which is more important.