Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: architecture diagram, minor refinements

...

We design the BLOB storage with the following use cases for BLOB files in mind:

  • JarFile: a library, storing jar files, i.e. libraries for use by the user-ClassLoader
  • RpcMessageoffloading (large) RPC messages: for (a) off-loading large RPC messages that do not fit into the akka.framesize, (b) distributing a single (large) RPC message, e.g. the TaskDeploymentDescriptor, to multiple recipients by leveraging any underlying distributed file system in HA mode, (c) the ability to re-use a (large) cached RPC message during re-deployment after failures.
  • LogFile: retrieving log files from task managers (currently only used by exchange of task manager log files to shown them in the Web-UI)

The maximum life span of any of these types is limited by the life span of the job it belongs to. Files will be ref-counted so that shorter life spans are possible, e.g. for the latter two types.

...

Ref-counting and cleanup are tasks generic to any BLOB file in the system and should therefore be handled more generically in the BlobCache (holding locally cached BLOBs) and BlobServer (a global and central store to retrieve local copies from; with support for high availability storage (HA)).

BLOB Store High-Level Components

Gliffy Diagram
nameblob-store-architecture

BlobServer

  • offers file upload and download facilities based on jobId and BlobKey
  • local store (file system): read/write access, using "<path>/<jobId>/<BlobKey>"
  • HA store: read/write access for high availability, using "<path>/<jobId>/<BlobKey>"
  • responsible for cleanup of local +and HA storesstorage
  • upload to local store, then to HA (possibly in parallel, but waiting for both to finish before acknowledging)
  • downloads will be served from local storage only
  • on recovery (HA): download used needed files from HA to local store, take cleanup responsibility for all other files on the path, i.e. orphaned files, too! (see below)

BlobCache

  • offers transparent local cache of BlobServer files (based on jobId and BlobKey)
  • local store (file system): read/write access, using "<path>/<jobId>/<BlobKey>"
  • HA store: read access only (if available)
  • download either from HA store or BlobServer
  • responsible for cleanup of local storage

BlobClient

  • offers interface to upload and download files
  • communicates with BlobServer

LibraryCacheManager

  • bridge between a task's classloader and cached library BLOBs, i.e. jar files, at the BlobCache

...

The BlobCache uses ref-counting of all of its BLOBs and starts deleting unreferenced files after "blob.retention.interval" seconds. The <jobId> subdirectory is also ref-counted along with each of its BLOBs and will be deleted similarly. Note that , here, we currently do the BlobCache currently does not have the information that a job entered a final state and thus have has to rely on reference counting only for the removal of a job's BLOBs. We may only cleanup any remaining files in the <jobId> subdirectory when the TaskManager shuts down.

The BlobServer will use the same ref-counting technique and will delete files from both local and HA storage appropriately. Additionally, it will have a safety-net by using the knowledge that of when a job entered is entering a final state. In that case, the job's BLOB storage directory (local and HA) will be deleted, i.e. "<path>/<jobId>", including all its BLOB files.

...

Up to Flink 1.3, at the LibraryCacheManager, we run a periodic cleanup task every hour (see ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL or the "library-cache-manager.cleanup.interval" configuration parameter) which deletes any unreferenced jar files. If a job's task fails right before the cleanup starts, a following recovery could thus not access the cached file anymore. We'd like to change this with a "staged cleanup":.

In a staged cleanup, a BLOB file is only deleted at the second time the (periodic) cleanup task encounters this BLOB to be as being unreferenced, e.g. by having two cleanup lists: one for the actual cleanup, i.e. "delete now", one for staging files, i.e. "delete next time". Each time the cleanup task runs, the actual cleanup list (and files) will be deleted and the staging list becomes the actual list. This is cheaper than having a time-to-live for each unreferenced jar and good enough for the cleanup task at hand. A (currently) unreferenced BLOB will thus stay around at least "blob.retention.interval" seconds and at most twice this amount. As long as it stays, it may be re-used by future (recovery) tasks and does not need to be downloaded again.

...

  • fetch all jobs to recover
  • download their BLOBs and increase reference counts appropriately (at the JobManager only after successful job submission)
  • put any other, i.e. orphaned, file in the configured storage path into staged cleanup

Use Cases Details

Jar files

...

RPC Messages

...

Log Files

...

Compatibility, Deprecation, and Migration Plan

...