Status

Discussion threadhttps://lists.apache.org/thread/wlzv02jqtq221kb8dnm82v4xj8tomd94
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.15

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In highly-available setups with multiple masters, we need a component that provides the newly elected leader with the current state of a job that it’s trying to recover. We already have the first version of this component, the RunningJobsRegistry (RJR), that has several limitations we want to address in this FLIP.

Main limitation of the RJR is that we clear it once the job reaches a globally terminal state. Some of the issues that could be solved by introducing a component that persists job state after this job is finished and probably even after the Flink cluster is gone, are:

  • Any standby Dispatcher would know about sucessfully finished jobs which do not need to be re-executed after a failover while the job was already terminated but the JobGraph was still around ( Unable to render Jira issues macro, execution error. ).
  • The RJR does not provide access to the JobResult of a completed job. We have to return UNKNOWN as a result when we failover in application mode ( Unable to render Jira issues macro, execution error. ).
  • Having access to a JobResult, after the job has completed, would also pave the road for supporting multi-stage jobs in ApplicationMode and highly available job drivers in general.

Public Interfaces

  • RunningJobsRegistry will be replaced by JobResultStore
  • k8s- and ZK-specific implementations of RunningJobsRegistry will be replaced by a file-based approach
    • Users that use a customized HA implementation might be affected by this change because the HAServices interface is going to be modified
  • New configuration parameters are going to be introduced to make the file-based JobResultStore implementation configurable
  • REST API for the JobResult might be extended to also include the cleanup state of this Job

Proposed Changes

In this document we propose removing RJR and introducing a new opt-in component, the JobResultStore (JSR).

JobId semantics

For the purpose of this FLIP and the future work on the ApplicationMode, we need to re-define the JobId semantics to meet the following properties.

  1. JobId is a stable unique identifier of the job within a Flink cluster.
  2. Tuple of (ClusterId, JobId) is a globally unique job identifier.

Multiple jobs with the same JobId (within the same cluster) are treated as a duplicate, even if JobGraph changes between the two.

We’re going to treat HighAvailabilityOptions#HA_CLUSTER_ID as the ClusterId.

We don’t need any changes to interfaces around JobID as we can simply use ClusterId as a JobResultStore namespace.

JobResultStore

JobResultStore (JRS) is the successor to RJR, which is able to outlive a concrete job in order to solve recovery issues in multi-master setups. The idea of the JobResultStore is to persist information about successfully completed jobs that is under the control of a 3rd party. The cleanup of the information stored in the JRS, is the responsibility of the 3rd party (e.g. after the Flink cluster has been fully stopped).

To avoid undesirable resource leaks, per default, for highly available setups, Flink will be started with a FileSystemJobResult store, with JobResultStoreOptions#DELETE_ON_COMMIT being enabled. This means that the user needs to opt-in to get better guarantees and manual JRS cleanup.

For non-HA setups, Flink will be started with InMemoryJobResultStore.

The following should be covered by a new component:

  • Access to the JobResult of a globally terminated job (especially useful for highly-available job drivers).
  • JobResult is initially marked as dirty and needs to be committed after resource cleanup.
  • Ensure that we don’t restart jobs that have already been completed.

Atomic cleanup of job resources

In multi-master setups, we need to cleanup following resources after the job finishes:

  • JobGraphStore
  • CheckpointCounter
  • CheckpointStore
  • RunningJobsRegistry (this step would be removed)
  • HighlyAvailableServices (leader election)
  • BlobStore

Currently we may easily run into resource leaks / inconsistent states, if the cleanup process fails without cleaning up all the components. In this scenario we need the failed-over dispatcher to re-execute cleanup.

This introduces a new requirement on the job resources, cleanup operation has to be idempotent. We should also introduce a retry mechanism, to allow cleanup to progress without fail-over, for example in case of transient failure of underlying storage. We now require all cleanup steps to succeed, unlike with RJR.

Retrying can be done infinitely until it succeeds or the user intervenes (e.g. through some limiting configuration parameter like number of retries). We could also add some dynamically changing backoff for retry intervals.

For a client accessing the job result we could think of providing the actual job's result since the actual computation is done even with the job still being in the cleanup phase.

Recovery workflow

Cleanup (Draft)

Currently, the different artifacts are cleaned up in different locations:

Component

Who is responsible?

Why?

JobResultStore

Dispatcher

It needs to work without a JobMaster.

JobGraphStore

Dispatcher

We need to be able to fetch a JobGraph for recovery before starting JobMaster.

HighAvailableServices

  • JM LeaderElection
  • RM LeaderElection

Dispatcher

We can clean this up after there won’t be any more contenders.

BlobServer (for a given job)

Dispatcher


CheckpointCounter/CheckpointStore 

JobMaster

This is tied to a single job execution (JobMasterLifecycle).

JobMaster

Dispatcher

Dispatcher is responsible for JobMaster management.

The plan is to provide a common interface that enables all components to trigger the cleanup based on the JobID. This cleanup component can encapsulate the retry mechanism. The actual cleanup can then be triggered either in the JobMaster or the Dispatcher depending on whether a JobGraph is still available or not.

Implementation

Interface

JobResultEntry functions as the struct to hold the actual JobResult and the corresponding cleanup state (i.e. either dirty or cleaned up). The JobResultStore interface provides all methods required to manage the JobResultEntries.

InMemoryJobResultStore (aka Standalone- or EmbeddedJobResultStore)

The InMemoryJobResultStore is a simple implementation of the JobResultStore interface that stores the data in memory (analogous to the current StandaloneRunningJobsRigstry). Any failover would result in the information being lost. This implementation can work as the default option for non-HA setups.

This implementation only covers the execution of multiple jobs without failover.

FileSystemJobResultStore

A simple persistent JobResultStore that uses a distributed file system to store the JobResultEntries in the form of files. This also provides an easy and intuitive way for the 3rd party to clean up the JRS after the Flink cluster has been terminated.

We’re assuming that the underlying file-system provides strong read-after-write consistency. Research was done for the most common cloud providers in addition to HDFS and Minio to ensure that this assumption holds for relevant vendors.

This implementation would store the JobResult with additional metadata (eg. the final checkpoint / savepoint) in json format as a file with the naming convention outlined below:

As mentioned in the section about Job ID semantics earlier in this FLIP, the JobID needs to be put in some namespace context that is provided through the ClusterID. This can be achieved by either storing all JobResultEntry files of a single HA Cluster in an individual subfolder or provide the HA Cluster ID as a prefix of the filename.

Used file format is versioned to allow for future extensions. Having a version present in the filename helps us to choose the right serializer / deserializer and allows for effective filtering and migration from older formats. Each serializer is responsible for writing the version inside serialized content as a safeguard against external changes. It's still an open for discussion whether we want to keep the version in the JSON structure only to avoid redundancy. This would solve the issue of conflict resolution if the content of the file does not match the filename anymore.

Configuration

The FileSystemJobResultStore would introduce two new configuration parameters:

  • JobResultStore#STORAGE_PATH: This parameter is used for defining the storage path of the JobResultEntry files. In order to align it with other similar configuration parameters, we would set the default to ${HighAvailabilityOptions#HA_STORAGE_PATH}/job-results-store/${HighAvailabilityOptions#HA_CLUSTER_ID}
  • JobResultStoreOptions#DELETE_ON_COMMIT: This parameter is used to simulate the current behavior, i.e. deleting the entries right after the cleanup is finalized and committed. This parameter would be enabled by default leading to no resources being left and no manual cleanup being required. Having this flag enabled would, as a consequence, also lead to weaker guarantees on the failover stability.


Compatibility, Deprecation, and Migration Plan

This change removes a “semi-public” RunningJobsRegistry. This only affects users with custom implementations of HA Services. For regular users that are using Zookeeper or Kubernetes implementation of HA Services (which are provided by Flink), this won’t be noticeable.

Test Plan

The proposed change should be tested with unit, integration and e2e tests for common failover scenarios

Rejected Alternatives

Other Implementations of JobResultStore

Other HA implementations like KubernetesJobResultStore or ZooKeeperJobResultStore were not considered due to the following reasons:

  • Both HA implementations (ZK and k8s) are bundled with an object store (e.g. s3, gcs) to store the actual data. The metadata was stored in ZooKeeper or k8s ConfigMaps due to their support for of read-after-write consistency guarantees which, for instance, S3 didn't provide in the past. Nowadays object stores provide this guarantee.
  • The file-based approach makes it easier for a 3rd party to clean job results up.