You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: tba

JIRA: FLINK-11813

Released: 1.15

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In highly-available setups with multiple masters, we need a component that provides the newly elected leader with the current state of a job that it’s trying to recover. We already have the first version of this component, the RunningJobsRegistry (RJR), that has several limitations we want to address in this FLIP.

Main limitation of the RJR is that we clear it once the job reaches a globally terminal state, which may lead to re-execution of the job by the failed-over dispatcher, because there is no persistent record of the successful job execution that would outlive the cluster.

Another limitation is that RJR does not provide access to the JobResult of a completed job, so we may need to return the UNKNOWN result, when we failover in application mode (FLINK-21928). Having access to a JobResult, after the job has completed, would also pave the road for supporting multi-stage jobs in ApplicationMode and highly available job drivers in general.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

In this document we propose removing RJR and introducing a new opt-in component, the JobResultStore (JSR).

JobId semantics

For the purpose of this FLIP and the future work on the ApplicationMode, we need to re-define the JobId semantics to meet the following properties.

  1. JobId is a stable unique identifier of the job within a Flink cluster.
  2. Tuple of (ClusterId, JobId) is a globally unique job identifier.

Multiple jobs with the same JobId (within the same cluster) are treated as a duplicate, even if JobGraph changes between the two.

We’re going to treat HighAvailabilityOptions#HA_CLUSTER_ID as the ClusterId.

We don’t need any changes to interfaces around JobID as we can simply use ClusterId as a JobResultStore namespace.

JobResultStore

JobResultStore (JRS) is the successor to RJR, which is able to outlive a concrete job in order to solve recovery issues in multi-master setups. The idea of the JobResultStore is to persist information about successfully completed jobs that is under the control of a 3rd party. The cleanup of the information stored in the JRS, is the responsibility of the 3rd party (e.g. after the Flink cluster has been fully stopped).

To avoid undesirable resource leaks, per default, for highly available setups, Flink will be started with a FileSystemJobResult store, with JobResultStoreOptions#DELETE_ON_COMMIT being enabled. This means that the user needs to opt-in to get better guarantees and manual JRS cleanup.

For non-HA setups, Flink will be started with InMemoryJobResultStore.

The following should be covered by a new component:

  • Access to the JobResult of a globally terminated job (especially useful for highly-available job drivers).
  • JobResult is initially marked as dirty and needs to be committed after resource cleanup.
  • Ensure that we don’t restart jobs that have already been completed.

Atomic cleanup of job resources

In multi-master setups, we need to cleanup following resources after the job finishes:

  • JobGraphStore
  • CheckpointCounter
  • CheckpointStore
  • RunningJobsRegistry (this step would be removed)
  • HighlyAvailableServices (leader election)
  • BlobStore

Currently we may easily run into resource leaks / inconsistent states, if the cleanup process fails without cleaning up all the components. In this scenario we need the failed-over dispatcher to re-execute cleanup.

draw.io

Diagram attachment access error: cannot display diagram

This introduces a new requirement on the job resources, cleanup operation has to be idempotent. We should also introduce a retry mechanism, to allow cleanup to progress without fail-over, for example in case of transient failure of underlying storage. We now require all cleanup steps to succeed, unlike with RJR.

Retrying can be done infinitely until it succeeds or the user intervenes (e.g. through some limiting configuration parameter like number of retries). We could also add some dynamically changing backoff for retry intervals.

For a client accessing the job result we could think of providing the actual job's result since the actual computation is done even with the job still being in the cleanup phase.

Recovery workflow


draw.io

Diagram attachment access error: cannot display diagram

Cleanup (Draft)

Currently, the different artifacts are cleaned up in different locations:

Component

Who is responsible?

Why?

JobResultStore

Dispatcher

It needs to work without a JobMaster.

JobGraphStore

Dispatcher

We need to be able to fetch a JobGraph for recovery before starting JobMaster.

HighAvailableServices

  • JM LeaderElection
  • RM LeaderElection

Dispatcher

We can clean this up after there won’t be any more contenders.

BlobStore (for a given job)

Dispatcher

User JARs. This is needed for recovery.

CheckpointCounter/CheckpointStore 

JobMaster

This is tied to a single job execution (JobMasterLifecycle).

JobMaster

Dispatcher

Dispatcher is responsible for JobMaster management.

The plan is to unite the cleanup logic in a single component that is, i.e. the Dispatcher, as it is also the component being in charge of accessing the JobResultStore.

Implementation

Interface



Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels