Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • all blobs are ref-counted, starting from first BLOB retrieval/delivery
  • if reference = 0 the BLOB enters the staged cleanup (see above)
  • if a job task succeeds, fails or is cancelled, all its BLOBs' references are counted down appropriately (we may–at a later point in time--optimise the case when a task enters a final state and delete these files immediately once the reference counter is 0)
  • all blobs should be deleted when the TM exits

...

All BLOB files stored at the BlobServer should also be subject to a cleanup periodically cleaned up and not just be deleted when the BlobServer shuts down (as of Flink 1.3). The actual lifecycle of the different BLOB types is slightly different as described here:below, all based on reference counting and a staged cleanup as in the BlobCache.

JarFile

Jar files should be job-specific and will be bound to the corresponding job's lifecycle.
.... The usecase of several jobs sharing the same jar file(s) and an optimisation using only a single manifestation of this file may be covered at a later point in time since we do want to keep it simple this time and develop a proper cleanup story instead which is more important.

  • if a job fails, its BLOBs' references are counted down appropriately
  • similarly, once a job enters a final state (finished or cancelled), all its BLOB files' references are counted down appropriately (we may optimise this at later point in time and delete these files immediately)
  • During the upload

...

  • : While user jars are being uploaded, the corresponding job is not submitted yet and we cannot bind the jar files to a non-existing job's lifecycle. Also, we cannot upload each file at once and use a ref-counter of 0 each or some files may already be deleted when the job is started. Instead, we will upload all jar files together and only after receiving the last one, we will put all of them into the staging list of the staged cleanup. The job then needs to be submitted within "blob.retention.interval" seconds or we cannot make any guarantees that the jar files still exist. This ensures a proper cleanup during client aborts/crashes between the upload and the job submission.

RpcMessage

While we can certainly delete RPC message BLOBs after the job enters a final state, in general, RPC messages have a shorter lifetime than jobs. We will thus let the user of this service, e.g. some RPC daemon/wrapper class, actively ask for a number of references when retrieving/uploading the message and when releasing it.

LogFile

This is similar to the RpcMessage type. We may argue whether to keep logs after a job enters a final state but since they are always still available at the task managers, we will delete unreferenced logs in this case.

BlobServer Upload

...

...

BlobServer Recovery (HA)

During recovery, the JobManager (or the Dispatcher for FLIP-6) will:

  • fetch all jobs to recover
  • download their BLOBs and increase reference counts appropriately (at the JobManager only after successful job submission)
  • delete any other, i.e. orphaned, files in the configured storage path

Compatibility, Deprecation, and Migration Plan

...