This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
...
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Cleanup (Draft)
Currently, the different artifacts are cleaned up in different locations:
...
The plan is to unite the cleanup logic in a single component that is, i.e. the Dispatcher, as it is also the component being in charge of accessing the JobResultStore
.
Implementation
Interface
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
JobResultEntry
functions as the struct to hold the actual JobResult
and the corresponding cleanup state (i.e. either dirty or cleaned up). The JobResultStore
interface provides all methods required to manage the JobResultEntries
.
InMemoryJobResultStore (aka Standalone- or EmbeddedJobResultStore)
The InMemoryJobResultStore
is a simple implementation of the JobResultStore
interface that stores the data in memory (analogous to the current StandaloneRunningJobsRigstry
). Any failover would result in the information being lost. This implementation can work as the default option for non-HA setups.
This implementation only covers the execution of multiple jobs without failover.
FileSystemJobResultStore
A simple persistent JobResultStore
that uses a distributed file system to store the JobResultEntries
in the form of files. This also provides an easy and intuitive way for the 3rd party to clean up the JRS after the Flink cluster has been terminated.
We’re assuming that the underlying file-system provides strong read-after-write consistency. Research was done for the most common cloud providers in addition to HDFS and Minio to ensure that this assumption holds for relevant vendors.
This implementation would store the JobResult
with additional metadata (eg. the final checkpoint / savepoint) in json format as a file with the naming convention outlined below:
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
As mentioned in the section about Job ID semantics earlier in this FLIP, the JobID needs to be put in some namespace context that is provided through the ClusterID. This can be achieved by either storing all JobResultEntry files of a single HA Cluster in an individual subfolder or provide the HA Cluster ID as a prefix of the filename.
Used file format is versioned to allow for future extensions. Having a version present in the filename helps us to choose the right serializer / deserializer and allows for effective filtering and migration from older formats. Each serializer is responsible for writing the version inside serialized content as a safeguard against external changes. It's still an open for discussion whether we want to keep the version in the JSON structure only to avoid redundancy. This would solve the issue of conflict resolution if the content of the file does not match the filename anymore.
Configuration
The FileSystemJobResultStore would introduce two new configuration parameters:
JobResultStore#STORAGE_PATH
: This parameter is used for defining the storage path of the JobResultEntry files. In order to align it with other similar configuration parameters, we would set the default to${HighAvailabilityOptions#HA_STORAGE_PATH}/job-results-store/${HighAvailabilityOptions#HA_CLUSTER_ID}
JobResultStoreOptions#DELETE_ON_COMMIT:
This parameter is used to simulate the current behavior, i.e. deleting the entries right after the cleanup is finalized and committed. This parameter would be enabled by default leading to no resources being left and no manual cleanup being required. Having this flag enabled would, as a consequence, also lead to weaker guarantees on the failover stability.
Other Implementations of JobResultStore
Other HA implementations like KubernetesJobResultStore or ZooKeeperJobResultStore were not considered due to the following reasons:
- Both HA implementations (ZK and k8s) are bundled with an object store (e.g. s3, gcs) to store the actual data. The metadata was stored in ZooKeeper or k8s ConfigMaps due to their support for of read-after-write consistency guarantees which, for instance, S3 didn't provide in the past. Nowadays object stores provide this guarantee.
- The file-based approach makes it easier for a 3rd party to clean job results up.
Compatibility, Deprecation, and Migration Plan
This change removes a “semi-public” RunningJobsRegistry. This only affects users with custom implementations of HA Services. For regular users that are using Zookeeper or Kubernetes implementation of HA Services (which are provided by Flink), this won’t be noticeable.
Test Plan
The proposed change should be tested with unit, integration and e2e tests for common failover scenariosDescribe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.