Status
Page properties |
---|
This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Status
...
|
...
|
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In highly-available setups with multiple masters, we need a component that provides the newly elected leader with the current state of a job that it’s trying to recover. We already have the first version of this component, the RunningJobsRegistry
(RJR), that has several limitations we want to address in this FLIP.
Main limitation of the RJR is that we clear it once the job reaches a globally terminal state. Some of the issues that could be solved by introducing a component that persists job state after this job is finished and probably even after the Flink cluster is gone, which may lead to re-execution of the job by the failed-over dispatcher, because there is no persistent record of the successful job execution that would outlive the cluster.Another limitation is that are:
- Any standby Dispatcher would know about sucessfully finished jobs which do not need to be re-executed after a failover while the job was already terminated but the JobGraph was still around (
).Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-11813 - The RJR does not provide access to the JobResult of a completed job
...
- . We have to return
...
- UNKNOWN as a result
...
- when we failover in application mode (
).Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key FLINK-21928 - Having access to a
JobResult
, after the job has completed, would also pave the road for supporting multi-stage jobs inApplicationMode
and highly available job drivers in general.
Public Interfaces
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
A public interface is any change to the following:
Binary log formatThe network protocol and api behaviorAny class in the public packages under clientsConfiguration, especially client configurationorg/apache/kafka/common/serializationorg/apache/kafka/commonorg/apache/kafka/common/errorsorg/apache/kafka/clients/producerorg/apache/kafka/clients/consumer (eventually, once stable)
MonitoringCommand line tools and argumentsAnything else that will likely break existing users in some way when they upgrade
RunningJobsRegistry
will be replaced byJobResultStore
- k8s- and ZK-specific implementations of
RunningJobsRegistry
will be replaced by a file-based approach- Users that use a customized HA implementation might be affected by this change because the HAServices interface is going to be modified
- New configuration parameters are going to be introduced to make the file-based
JobResultStore
implementation configurable - REST API for the
JobResult
might be extended to also include the cleanup state of this Job
Proposed Changes
In this document we propose removing RJR and introducing a new opt-in component, the JobResultStore (JSR).
JobId semantics
For the purpose of this FLIP and the future work on the ApplicationMode, we need to re-define the JobId semantics to meet the following properties.
...
Multiple jobs with the same JobId (within the same cluster) are treated as a duplicate, even if JobGraph changes between the two.
We’re going to treat HighAvailabilityOptions#HA_CLUSTER_ID as the ClusterId.
We don’t need any changes to interfaces around JobID as we can simply use ClusterId as a JobResultStore namespace.
JobResultStore
JobResultStore (JRS) is the successor to RJR, which is able to outlive a concrete job in order to solve recovery issues in multi-master setups. The idea of the JobResultStore
is to persist information about successfully completed jobs that is under the control of a 3rd party. The cleanup of the information stored in the JRS, is the responsibility of the 3rd party (e.g. after the Flink cluster has been fully stopped).
...
- Access to the
JobResult
of a globally terminated job (especially useful for highly-available job drivers). JobResult
is initially marked as dirty and needs to be committed after resource cleanup.- Ensure that we don’t restart jobs that have already been completed.
Atomic cleanup of job resources
In multi-master setups, we need to cleanup following resources after the job finishes:
...
draw.io Diagram border true diagramName FLIP-193 194 - Idempotent Cleanup simpleViewer false width links auto tbstyle top lbox true diagramWidth 1131 revision 2
...
For a client accessing the job result we could think of providing the actual job's result since the actual computation is done even with the job still being in the cleanup phase.
Recovery workflow
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Cleanup (Draft)
Currently, the different artifacts are cleaned up in different locations:
Component | Who is responsible? | Why? |
|
| It needs to work without a |
|
| We need to be able to fetch a |
|
| We can clean this up after there won’t be any more contenders. |
BlobStore
|
|
|
| This is tied to a single job execution (JobMasterLifecycle). |
|
|
|
The plan is to unite the cleanup logic in a single component that is, i.e. the Dispatcher, as it is also the component being in charge of accessing the JobResultStore
.
Implementation
to provide a common interface that enables all components to trigger the cleanup based on the JobID. This cleanup component can encapsulate the retry mechanism. The actual cleanup can then be triggered either in the JobMaster or the Dispatcher depending on whether a JobGraph is still available or not.
Implementation
Interface
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
JobResultEntry
functions as the struct to hold the actual JobResult
and the corresponding cleanup state (i.e. either dirty or cleaned up). The JobResultStore
interface provides all methods required to manage the JobResultEntries
.
InMemoryJobResultStore (aka Standalone- or EmbeddedJobResultStore)
The InMemoryJobResultStore
is a simple implementation of the JobResultStore
interface that stores the data in memory (analogous to the current StandaloneRunningJobsRigstry
). Any failover would result in the information being lost. This implementation can work as the default option for non-HA setups.
This implementation only covers the execution of multiple jobs without failover.
FileSystemJobResultStore
A simple persistent JobResultStore
that uses a distributed file system to store the JobResultEntries
in the form of files. This also provides an easy and intuitive way for the 3rd party to clean up the JRS after the Flink cluster has been terminated.
We’re assuming that the underlying file-system provides strong read-after-write consistency. Research was done for the most common cloud providers in addition to HDFS and Minio to ensure that this assumption holds for relevant vendors.
This implementation would store the JobResult
with additional metadata (eg. the final checkpoint / savepoint) in json format as a file with the naming convention outlined below:
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
As mentioned in the section about Job ID semantics earlier in this FLIP, the JobID needs to be put in some namespace context that is provided through the ClusterID. This can be achieved by either storing all JobResultEntry files of a single HA Cluster in an individual subfolder or provide the HA Cluster ID as a prefix of the filename.
Used file format is versioned to allow for future extensions. Having a version present in the filename helps us to choose the right serializer / deserializer and allows for effective filtering and migration from older formats. Each serializer is responsible for writing the version inside serialized content as a safeguard against external changes. It's still an open for discussion whether we want to keep the version in the JSON structure only to avoid redundancy. This would solve the issue of conflict resolution if the content of the file does not match the filename anymore.
Configuration
The FileSystemJobResultStore would introduce two new configuration parameters:
JobResultStore#STORAGE_PATH
: This parameter is used for defining the storage path of the JobResultEntry files. In order to align it with other similar configuration parameters, we would set the default to${HighAvailabilityOptions#HA_STORAGE_PATH}
/job-results-store/${HighAvailabilityOptions#HA_CLUSTER_ID}
JobResultStoreOptions#DELETE_ON_COMMIT:
This parameter is used to simulate the current behavior, i.e. deleting the entries right after the cleanup is finalized and committed. This parameter would be enabled by default leading to no resources being left and no manual cleanup being required. Having this flag enabled would, as a consequence, also lead to weaker guarantees on the failover stability.
...
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
...
This change removes a “semi-public” RunningJobsRegistry. This only affects users with custom implementations of HA Services. For regular users that are using Zookeeper or Kubernetes implementation of HA Services (which are provided by Flink), this won’t be noticeable.
Test Plan
The proposed change should be tested with unit, integration and e2e tests for common failover scenarios
Rejected Alternatives
Other Implementations of JobResultStore
Other HA implementations like KubernetesJobResultStore or ZooKeeperJobResultStore were not considered due to the following reasons:
- Both HA implementations (ZK and k8s) are bundled with an object store (e.g. s3, gcs) to store the actual data. The metadata was stored in ZooKeeper or k8s ConfigMaps due to their support for of read-after-write consistency guarantees which, for instance, S3 didn't provide in the past. Nowadays object stores provide this guarantee.
- The file-based approach makes it easier for a 3rd party to clean job results up.