Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

...

draw.io Diagram
bordertrue
diagramNameFLIP-193 - Recovery Workflow
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth671
revision23

Cleanup (Draft)

Currently, the different artifacts are cleaned up in different locations:

...

The plan is to unite the cleanup logic in a single component that is, i.e. the Dispatcher, as it is also the component being in charge of accessing the JobResultStore.

Implementation

Interface

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

draw.io Diagram
bordertrue
diagramNameFLIP-140 - Interfaces
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth561
revision2

JobResultEntry functions as the struct to hold the actual JobResult and the corresponding cleanup state (i.e. either dirty or cleaned up). The JobResultStore interface provides all methods required to manage the JobResultEntries.

InMemoryJobResultStore (aka Standalone- or EmbeddedJobResultStore)

The InMemoryJobResultStore is a simple implementation of the JobResultStore interface that stores the data in memory (analogous to the current StandaloneRunningJobsRigstry). Any failover would result in the information being lost. This implementation can work as the default option for non-HA setups.

This implementation only covers the execution of multiple jobs without failover.

FileSystemJobResultStore

A simple persistent JobResultStore that uses a distributed file system to store the JobResultEntries in the form of files. This also provides an easy and intuitive way for the 3rd party to clean up the JRS after the Flink cluster has been terminated.

We’re assuming that the underlying file-system provides strong read-after-write consistency. Research was done for the most common cloud providers in addition to HDFS and Minio to ensure that this assumption holds for relevant vendors.

This implementation would store the JobResult with additional metadata (eg. the final checkpoint / savepoint) in json format as a file with the naming convention outlined below:

draw.io Diagram
bordertrue
diagramNameFLIP-194 - Filename format
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth421
revision1

As mentioned in the section about Job ID semantics earlier in this FLIP, the JobID needs to be put in some namespace context that is provided through the ClusterID. This can be achieved by either storing all JobResultEntry files of a single HA Cluster in an individual subfolder or provide the HA Cluster ID as a prefix of the filename.

Used file format is versioned to allow for future extensions. Having a version present in the filename helps us to choose the right serializer / deserializer and allows for effective filtering and migration from older formats. Each serializer is responsible for writing the version inside serialized content as a safeguard against external changes. It's still an open for discussion whether we want to keep the version in the JSON structure only to avoid redundancy. This would solve the issue of conflict resolution if the content of the file does not match the filename anymore.

Configuration

The FileSystemJobResultStore would introduce two new configuration parameters:

  • JobResultStore#STORAGE_PATH: This parameter is used for defining the storage path of the JobResultEntry files. In order to align it with other similar configuration parameters, we would set the default to ${HighAvailabilityOptions#HA_STORAGE_PATH}/job-results-store/${HighAvailabilityOptions#HA_CLUSTER_ID}
  • JobResultStoreOptions#DELETE_ON_COMMIT: This parameter is used to simulate the current behavior, i.e. deleting the entries right after the cleanup is finalized and committed. This parameter would be enabled by default leading to no resources being left and no manual cleanup being required. Having this flag enabled would, as a consequence, also lead to weaker guarantees on the failover stability.

Other Implementations of JobResultStore

Other HA implementations like KubernetesJobResultStore or ZooKeeperJobResultStore were not considered due to the following reasons:

  • Both HA implementations (ZK and k8s) are bundled with an object store (e.g. s3, gcs) to store the actual data. The metadata was stored in ZooKeeper or k8s ConfigMaps due to their support for of read-after-write consistency guarantees which, for instance, S3 didn't provide in the past. Nowadays object stores provide this guarantee.
  • The file-based approach makes it easier for a 3rd party to clean job results up.


Compatibility, Deprecation, and Migration Plan

This change removes a “semi-public” RunningJobsRegistry. This only affects users with custom implementations of HA Services. For regular users that are using Zookeeper or Kubernetes implementation of HA Services (which are provided by Flink), this won’t be noticeable.

Test Plan

The proposed change should be tested with unit, integration and e2e tests for common failover scenariosDescribe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.