Status
Current state: "Under Discussion"
Discussion thread: -
JIRA: -
Released: -
Motivation
The current architecture around the BLOB server and cache components seems rather patched up and has some issues regarding concurrency ([FLINK-6380]), cleanup, API inconsistencies / currently unused API ([FLINK-6329], [FLINK-6008]). These make future integration with FLIP-6 or extensions like offloading oversized RPC messages ([FLINK-6046]) difficult. We therefore propose an improvement on the current architecture as described below which tackles these issues, provides some cleanup, and enables further BLOB server use cases.
Public Interfaces
The proposed changes mainly affect the back-end and are not user-facing. Currently, we also do not plan any changes to the configuration or the monitoring information, except for:
- "library-cache-manager.cleanup.interval": we will reduce the ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL to half of the current value so that BLOBs will be deleted at roughly the same expected delay as now (see "BLOB Lifecycle - Staged Cleanup" below)
Proposed Changes
Add Checksum Verifications
Since we already have the checksums in the CONTENT_ADDRESSABLE blobs, we might as well just verify them when reading/copying the files, i.e. upon download or at first use with an existing file that does not require the download.
Remove NAME_ADDRESSABLE BLOBs
Support for name-addressable blobs was implemented but never used so far and thus had some shortcoming and issues, especially for cleanup (some presented in [FLINK-6008]).
This is dead code and we don't actually need them and should remove it.
Supported BLOB File Types
For now, we'd like to support the following file types to be stored in our BLOB store:
- JarFile: a library, for use by the user-ClassLoader
- RpcMessage: for (1) off-loading large RPC messages that do not fit into the akka.framesize, (2) distributing a single (large) RPC message, e.g. the TaskDeploymentDescriptor, to multiple recipients by leveraging any underlying distributed file system in HA mode, (3) the ability to re-use a (large) cached RPC message during re-deployment after failures.
- LogFile: retrieving log files from task managers (currently only used by the Web-UI)
Each may have a different lifecycle to which we will come below.
Move BLOB Ref-counting & Cleanup from the LibraryCacheManager to the BlobCache
Ref-counting and cleanup are tasks generic to any BLOB file in the system and should therefore be handled more generically in the BlobCache (holding locally cached BLOBs) and BlobServer (a global and central store to retrieve local copies from; with support for high availability storage (HA)).
BLOB Lifecycle
In general, the BlobCache should ref-count all of its BLOBs and should start deleting unreferenced ones after "library-cache-manager.cleanup.interval" seconds.
Staged Cleanup
Up to Flink 1.3, at the LibraryCacheManager, we run a periodic cleanup task every hour (see ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL or the "library-cache-manager.cleanup.interval" configuration parameter) which deletes any unreferenced jar files. If a job's task fails right before the cleanup starts, a following recovery could thus not access the cached file anymore. We'd like to change this with a "staged cleanup":
In a staged cleanup, a BLOB file is only deleted at the second time the (periodic) cleanup task encounters this BLOB to be unreferenced, e.g. by having two cleanup lists: one for the actual cleanup, i.e. "delete now", one for staging files, i.e. "delete next time". Each time the cleanup task runs, the actual cleanup list (and files) will be deleted and the staging list becomes the actual list. This is cheaper than having a time-to-live for each unreferenced jar and good enough for the cleanup task at hand. A (currently) unreferenced BLOB will thus stay around at least "library-cache-manager.cleanup.interval" seconds and at most twice this amount. As long as it stays, it may be re-used by future (recovery) tasks.
BlobCache
- all blobs are ref-counted
- if reference = 0 the BLOB enters the staged cleanup (see above)
- all blobs should be deleted when the TM exits
BlobServer
All BLOB files stored at the BlobServer should now also be subject to a cleanup and not just at the BlobServer's shutdown. The actual lifecycle of the different BLOB types is slightly different as described here:
JarFile
RpcMessage
LogFile
Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.