You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current state: "Under Discussion"

Discussion thread: -

JIRA: -

Released: -

Motivation

The current architecture around the BLOB server and cache components seems rather patched up and has some issues regarding concurrency ([FLINK-6380]), cleanup, API inconsistencies / currently unused API ([FLINK-6329], [FLINK-6008]). These make future integration with FLIP-6 or extensions like offloading oversized RPC messages ([FLINK-6046]) difficult. We therefore propose an improvement on the current architecture as described below which tackles these issues, provides some cleanup, and enables further BLOB server use cases.

Public Interfaces

The proposed changes mainly affect the back-end and are not user-facing.
Currently, we also do not plan any changes to the configuration or the monitoring information, except for:

  • "library-cache-manager.cleanup.interval": replace by to "blob.retention.interval" and reduce its default value (currently ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = 1h) to half of the current value so that BLOBs will be deleted at roughly the same expected delay as now (see "BLOB Lifecycle - Staged Cleanup" below)

Proposed Changes

Add Checksum Verifications

Since we already have the checksums in the CONTENT_ADDRESSABLE blobs, we might as well just verify them when reading/copying the files, i.e. upon download or at first use with an existing file that does not require the download.

Remove NAME_ADDRESSABLE BLOBs

Support for name-addressable blobs was implemented but never used so far and thus had some shortcoming and issues, especially for cleanup (some presented in [FLINK-6008]).
This is dead code and we don't actually need them and should remove it.

Supported BLOB File Types

For now, we'd like to support the following file types to be stored in our BLOB store:

  • JarFile: a library, for use by the user-ClassLoader
  • RpcMessage: for (a) off-loading large RPC messages that do not fit into the akka.framesize, (b) distributing a single (large) RPC message, e.g. the TaskDeploymentDescriptor, to multiple recipients by leveraging any underlying distributed file system in HA mode, (c) the ability to re-use a (large) cached RPC message during re-deployment after failures.
  • LogFile: retrieving log files from task managers (currently only used by the Web-UI)

Each may have a different lifecycle to which we will come below.

Move BLOB Ref-counting & Cleanup from the LibraryCacheManager to the BlobCache

Ref-counting and cleanup are tasks generic to any BLOB file in the system and should therefore be handled more generically in the BlobCache (holding locally cached BLOBs) and BlobServer (a global and central store to retrieve local copies from; with support for high availability storage (HA)).

BLOB Store High-Level Components

BlobServer

  • offers file upload and download facilities
  • local store (file system): read/write access, using "<path>/<jobId>/<BlobKey>"
  • HA store: read/write access for high availability, using "<path>/<jobId>/<BlobKey>"
  • responsible for cleanup of local+HA stores
  • upload to local store, then to HA (possibly in parallel)
  • downloads will be served from local storage only
  • on recovery (HA): download used files from HA to local store, take cleanup responsibility for all other files on the path, too! (see below)

BlobCache

  • offers transparent local cache of BlobServer files
  • local store (file system): read/write access, using "<path>/<jobId>/<BlobKey>"
  • HA store: read access only (if available)
  • download either from HA store or BlobServer
  • responsible for cleanup of local storage

LibraryCacheManager

  • bridge between a task's classloader and cached library BLOBs, i.e. jar files, at the BlobCache

BLOB Lifecycle

In general, the BlobCache should ref-count all of its BLOBs and should start deleting unreferenced ones after "blob.retention.interval" seconds. On the BlobServer, we will also use ref-counting and introduce a proper lifecycle for each BLOB file and delete similarly from both local and HA storage.

Staged Cleanup

Up to Flink 1.3, at the LibraryCacheManager, we run a periodic cleanup task every hour (see ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL or the "library-cache-manager.cleanup.interval" configuration parameter) which deletes any unreferenced jar files. If a job's task fails right before the cleanup starts, a following recovery could thus not access the cached file anymore. We'd like to change this with a "staged cleanup":

In a staged cleanup, a BLOB file is only deleted at the second time the (periodic) cleanup task encounters this BLOB to be unreferenced, e.g. by having two cleanup lists: one for the actual cleanup, i.e. "delete now", one for staging files, i.e. "delete next time". Each time the cleanup task runs, the actual cleanup list (and files) will be deleted and the staging list becomes the actual list. This is cheaper than having a time-to-live for each unreferenced jar and good enough for the cleanup task at hand. A (currently) unreferenced BLOB will thus stay around at least "blob.retention.interval" seconds and at most twice this amount. As long as it stays, it may be re-used by future (recovery) tasks and does not need to be downloaded again.

BlobCache

  • all blobs are ref-counted, starting from first BLOB retrieval/delivery
  • if reference = 0 the BLOB enters the staged cleanup (see above)
  • if a task succeeds, fails or is cancelled, all its BLOBs' references are counted down appropriately (we may–at a later point in time--optimise the case when a task enters a final state and delete these files immediately once the reference counter is 0)
  • all blobs should be deleted when the TM exits

BlobServer

All BLOB files stored at the BlobServer should also be periodically cleaned up and not just when the BlobServer shuts down (as of Flink 1.3). The actual lifecycle of the different BLOB types is slightly different as described below, all based on reference counting and a staged cleanup as in the BlobCache.

JarFile

Jar files should be job-specific and will be bound to the corresponding job's lifecycle. The usecase of several jobs sharing the same jar file(s) and an optimisation using only a single manifestation of this file may be covered at a later point in time since we do want to keep it simple this time and develop a proper cleanup story instead which is more important.

  • if a job fails, its BLOBs' references are counted down appropriately
  • similarly, once a job enters a final state (finished or cancelled), all its BLOB files' references are counted down appropriately (we may optimise this at later point in time and delete these files immediately)
  • During the upload: While user jars are being uploaded, the corresponding job is not submitted yet and we cannot bind the jar files to a non-existing job's lifecycle. Also, we cannot upload each file at once and use a ref-counter of 0 each or some files may already be deleted when the job is started. Instead, we will upload all jar files together and only after receiving the last one, we will put all of them into the staging list of the staged cleanup. The job then needs to be submitted within "blob.retention.interval" seconds or we cannot make any guarantees that the jar files still exist. This ensures a proper cleanup during client aborts/crashes between the upload and the job submission.

RpcMessage

While we can certainly delete RPC message BLOBs after the job enters a final state, in general, RPC messages have a shorter lifetime than jobs. We will thus let the user of this service, e.g. some RPC daemon/wrapper class, actively ask for a number of references when retrieving/uploading the message and when releasing it.

LogFile

This is similar to the RpcMessage type. We may argue whether to keep logs after a job enters a final state but since they are always still available at the task managers, we will delete unreferenced logs in this case.

BlobServer Upload

...

BlobServer Recovery (HA)

During recovery, the JobManager (or the Dispatcher for FLIP-6) will:

  • fetch all jobs to recover
  • download their BLOBs and increase reference counts appropriately (at the JobManager only after successful job submission)
  • delete any other, i.e. orphaned, files in the configured storage path

Compatibility, Deprecation, and Migration Plan

The impact on users will be minimal except the (currently undocumented) configuration parameter that we'd like to change. We will keep the old one and mark it deprecated. Other than that, the user may notice a properly working cleanup with this implementation.

Test Plan

We will adapt the current unit tests to any new API to verify that the behaviour of the BLOB storage components did not change. Also, proper unit tests for the different BLOB types will be added. These additions should already cover more than before and since this is close to the back-end, further (integration) tests will indirectly verify that the changes work as expected.

 

  • No labels