Status

Discussion threadhttps://lists.apache.org/thread/074z237c07vtj74685nxo6bttkq3kshz
Vote threadhttps://lists.apache.org/thread/vkmghnohx3tl6h19of43hs75c9tnxh4w
JIRA

Unable to render Jira issues macro, execution error.

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when Flink’s job manager crashes or gets killed, possibly due to unexpected errors or planned nodes decommission, it will cause the following two situations:

  1. Failed, if the job does not enable HA.
  2. Restart, if the job enable HA. If it’s a streaming job, the job will be resumed from the last successful checkpoint. If it’s a batch job, it has to run from beginning, all previous progress will be lost.

In view of this, we think the JM crash may cause great regression for batch jobs, especially long running batch jobs. This FLIP is mainly to solve this problem so that batch jobs can recover most job progress after JM crashes. In this FLIP, our goal is to let most finished tasks not need to be re-run.

Public Interfaces

Introduce new configuration options

We intend to introduce the following new configuration parameters.

Key

Type

Default Value

Description

execution.batch.job-recovery.enabled

Boolean

false

A flag to enable or disable the job recovery

execution.batch.job-recovery.previous-worker.recovery.timeout

Duration

30 s

The timeout for a new job master to wait for the previous worker to reconnect

execution.batch.job-recovery.snapshot.min-pause

Duration

3 min

The minimal pause between snapshots. To avoid performance issues caused by too frequent snapshots. Used to take snapshots for operator coordinator or other components.

job-event.store.write-buffer.size

MemorySize

1M

The size of the write buffer of JobEventStore, the content will be flushed to external file system once it's full

job-event.store.write-buffer.flush-interval

Duration

1 s

The flush interval of write buffer of JobEventStore, over this time, the content will be flushed to external file system

Introduce SupportsBatchSnapshot interface

In order to support Job Recovery, we will reuse the SplitEnumerator#snapshotState method to obtain the internal state of the split enumerator. In order to distinguish it from a normal checkpoint, we will pass -1(NO_CHECKPOINT) to indicate that this is a snapshot for the no-checkpoint/batch scenarios. Some current split enumerator implementations may rely on an postive checkpoint id, or does not support taking snapshot in no-checkpoint/batch scenarios. Therefore, unless the source developer explicitly specifies that it can support taking snapshots in no checkpoint/batch scenarios, the state of SourceCoordinator/SplitEnumerator will not be recored and restored(see the JobEvent section for how to handle this case). We will introduce a new interface SupportsBatchSnapshot to indicate whether the split enumerator supports  taking snapshots in batch scenarios. 

SupportsBatchSnapshot
/**
 * An identifier interface for {@link SplitEnumerator}, which is used to support taking snapshot in
 * no-checkpoint/batch scenarios (for example to support job recovery in batch jobs). Once a {@link
 * SplitEnumerator} implements this interface, its {@link SplitEnumerator#snapshotState} method
 * needs to accept -1 (NO_CHECKPOINT) as the argument.
 */
@PublicEvolving
public interface SupportsBatchSnapshot {}


Proposed Changes

The Whole Workflow of Job Recovery

The whole workflow of job recovery, mainly includes 3 parts:

  1. During the normal execution, Flink will record states of JM (ExecutionGraph, OperatorCoordinator, etc) to persistent storage so that we can recover based on these states after JM crash. We will introduce an event-based method to record the state of JM.
  2. During the JM crashes and restarts (generally HA will be responsible for restarting JM), the shuffle service and TM will retain the partitions related to the target job and try to continuously reconnect.
  3. After JM restarts, the connection with shuffle service and TM will be re-established. Then JM will recover the job progress based on the previous recorded states and partitions currently existing in the cluster, and restart the scheduling.

Record states during normal execution

In order to recover the previous state after JM crash, we need to record the state of JM. We will introduce an event-based method to record the state of JM, each event(JobEvent) records the increment of the JM state. After JM crashes, we can recover the JM state by replaying the JobEvents.

JobEvent

We will record 2 kind of JobEvents:

  1. ExecutionJobVertexInitializedEvent: This event is responsible for recording the initialization information of ExecutionJobVertex, its content contains the decided parallelism of this job vertex, and its input information. This event will be triggered and written out when an ExecutionJobVertex is initialized.

  2. ExecutionVertexFinishedEvent: This event is responsible for recording the information of finished task. Our goal is that all finished tasks don’t need to re-run, so the simple idea is to trigger an event when a task is finished.  The content of this event contains:

    1. The state of the finished task/ExecutionVertex, including IO metrics, accumulators, etc. These contents can be easily obtained from ExecutionGraph.

    2. If the job vertex which this task belongs to has operator coordinators, the states of the operator coordinators also need to be recorded. See section OperatorCoordinator for details.
    3. The state of shuffle master, see section ShuffleMaster for details.

OperatorCoordinator

In order to obtain the state of operator coordinators, we will enrich the OperatorCoordinator#checkpointCoordinator method to let it accept  -1 (NO_CHECKPOINT) as the value of checkpointId, to support snapshotting the state of operator coordinator in batch jobs. After JM crashes, the operator coordinator can be restored from the previous recorded state. In addition to a simple restore(by OperatorCoordinator#resetToCheckpoint method), it also needs to call OperatorCoordinator#subtaskReset for the non-finished tasks (which may in running state before JM crashes) , because these tasks will be reset and re-run after JM crashes.

Consider that the operator coordinators may have large state, we believe it may cause large overhead if we snapshot the operator coordinator at each execution vertex finished. To solve this problem, we will add a new configuration option "execution.batch.job-recovery.snapshot.min-pause" to control the minimum pause between snapshots. When restoring, we will also reconcile the execution job vertex state with the operator coordinator state to be consistent. In other words, we will adjust the execution job vertex to the state at the time of the lastest snapshot operator coordinators.

However, currently, some operator coordinators may not support taking snapshot in no-checkpoint/batch scenarios (or does not accept -1 as argument), we need to add the following method to the operator coordinator:

OperatorCoordinator
public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {

    // other methods
    
    /**
     * Whether the operator coordinator supports taking snapshot in no-checkpoint/batch scenarios.
     * If it returns true, it's {@link OperatorCoordinator#checkpointCoordinator} and {@link
     * OperatorCoordinator#resetToCheckpoint} methods need to accept -1 (NO_CHECKPOINT) as the value
     * of checkpoint id. Otherwise returns false.
     */
    default boolean supportsBatchSnapshot() {
        return false;
    }


Method OperatorCoordinator#supportsBatchSnapshot is used to identify whether the current operator coordinator supports taking snapshot in no-checkpoint/batch scenarios. If an operator coordinator does not support snapshot, which means it will return the inital state after JM restarts, we will handle it as follows:
1. If not all tasks corresponding to this operator coordinator are finished, we will reset all these tasks and re-run them.
2. If all tasks corresponding to this operator coordinator finished, we don't need to do anything and the job can continue to run. However, once one of these tasks needs to be restarted at some point in the future (for example, due to PartitionNotFound exception), we need to reset all these tasks and re-run them.

For the source coordinator, whether it supports batch snapshot depends on whether it's SplitEnumerator supports batch snapshot. For it, we introduce the SupportsBatchSnapshot mentioned above to identify it.

ShuffleMaster

In order to obtain the state of shuffle master, we will add the following methods to shuffle master. Before the job starts running, we will check whether the shuffle master supports taking snapshots(through method supportsBatchSnapshot). If it is not supported, we will disable Job Recovery for jobs. 

Note that it's mainly designed for external/remote shuffle service, the Flink default Netty/TM shuffle is stateless and only needs to be an empty implementation.

ShuffleMaster
interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable {

    // other methods

    /**
     * Whether the shuffle master supports taking snapshot in batch scenarios, which will be used
     * when enable Job Recovery. If it returns true, we will call {@link #snapshotState} to take
     * snapshot, and call {@link #restoreState} to restore the state of shuffle master.
     */
    default boolean supportsBatchSnapshot() {
        return false;
    }
 
    default void snapshotState(
            CompletableFuture<ShuffleMasterSnapshot> snapshotFuture,
            ShuffleMasterSnapshotContext context) {
        snapshotFuture.complete(null);
    }
 
    default void restoreState(List<ShuffleMasterSnapshot> snapshots) {} 
}

 /**
 * This class represents a snapshot of shuffle master, which can be used to restore the internal
 * state of the shuffle master. It is full or incremental.
 */
interface ShuffleMasterSnapshot extends Serializable {

    /** Returns whether the snapshot is incremental. */
    boolean isIncremental();
}

/**
 * Snapshot context used to create {@link ShuffleMasterSnapshot}, which can provide necessary
 * informations.
 */
interface ShuffleMasterSnapshotContext {
}

Persistent JobEventStore

We intend to introduce a persistent JobEventStore to record the JobEvents, the store is based on the file system and has the following features:

  1. To avoid IO operations blocking the JM main thread, the JobEventStore will write each event out in an asynchronous thread.
  2. To avoid frequent IO operations causing great pressure on external file system, there will be a write buffer inside the JobEventStore. The JobEvents will be written to the buffer first, and then flushed to external file system when the buffer is full or the flush time is reached. The flush frequency will be controlled by the following 2 configuration options:
    1. job-event.store.write-buffer.size: The size of the write buffer, the content will be flushed to external file system once it's full.
    2. job-event.store.write-buffer.flush-interval: The flush interval of write buffer, over this time, the content will be flushed to external file system.

Reconnection

Currently, when a JM crashes, TM will sense it through HA or heartbeat timeout, and then it will release all resources related to this job (including slots, and partitions when using TM shuffle), and then disconnect from JM.

To support job recovery, we need do the following changes:

  1. When it is found that the JM lost, TM will fail all tasks belongs to the target job and and release the corresponding slots.
  2. If there are partitions belongs the target job on TM, the TM should retain the partitions, and wait for HA to notify the new JM and try to establish connection to the JM. We need to register a timeout for waiting, and release the partition after the timeout. We can reuse the existing configuration option “taskmanager.registration.timeout” here, the default value is 5 minutes.
  3. If there is no partitions belongs the target job on TM, keep the same logic as current.

If it‘s using other external shuffle services, it should be the same as TM shuffle, when it detects JM crash, it should retain the partitions and wait the JM to reconnect.

Re-schedule after JM restart

After JM restarts and becomes leader again, it will wait for a period of time for the TMs to re-establish connection with itself. The length of waiting time is controlled by "execution.batch.job-recovery.previous-worker.recovery.timeout", only TMs connected within this period will be accepted, and those that time out will be rejected. Once we have enough partitions (all partitions required to continue running are registered), we can end this wait early and continue to the next step.

After re-establish connection with TMs, JM will try to obtain all partitions existing in cluster through ShuffleMaster, and re-establish the partition information in JobMasterPartitionTracker. To do that, we need to add a new method getAllPartitionWithMetrics to ShuffleMaster.

After re-establish JobMasterPartitionTracker, JM begins to replay the JobEvents from the JobEventStore, recover the execution graph state, and then starts rescheduling based on the execution graph state and the partitions currently existing in the cluster:

  1. Initialize all ExecutionJobVertex whose parallelism has been decided. We can obtain the initialization information from the replayed events (ExecutionJobVertexInitializedEvent).
  2. According to the information in JobMasterPartitionTracker, the execution vertices whose produced partitions are all tracked will be marked as finished. 
  3. For execution vertices that are not marked as finished, as mentioned above, if its corresponding job vertex has operator coordinators, we need to call OperatorCoordinator#subtaskReset for them.
  4. Find all sink/leaf execution vertices in ExecutionGraph. For each sink/leaf execution vertex in the non-finish state, recursively find all its upstream vertices that need to be restarted (which are in unfinished state), and then start scheduling based on this.
ShuffleMaster
interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable {

    // other methods  

    /**
     * Get all partitions and their metrics, the metrics mainly includes the meta information of partition(partition bytes, etc).
     * @param jobId ID of the target job
     * @return All partitions belongs to the target job and their metrics
     */
    CompletableFuture<Collection<PartitionWithMetrics>> getAllPartitionWithMetrics(JobID jobId);

    interface PartitionWithMetrics {

        ShuffleMetrics getPartitionMetrics();

        ShuffleDescriptor getPartition();
    }

    interface ShuffleMetrics {

        ResultPartitionBytes getPartitionBytes();
    }
}

Compatibility, Deprecation, and Migration Plan

In the first version, the job recovery will be an optional optimization which the user has to activate explicitly by setting the config option execution.batch.job-recovery.enabled: true. This entails that Flink's default behavior won't change.

Limitations

Only support new source

Currently, the legacy source(SourceFunction, InputFormat) have already been depcreated, so we intend to only support new source.

Only work with adaptive batch scheduler

In FLIP-283, adaptive batch scheduler has been the default scheduler of Flink batch jobs, so we intend to only support working with the adaptive batch scheduler.

When using ApplicationMode, only support single-execute job

As mentioned in flink docs HA in Application Mode is only supported for single-execute applications. Job Recovery relies on HA,so when using ApplicationMode, the Job Recovery can only support single-execution applications.

Test Plan

The proposed changes will be tested for correctness and stability in a real cluster.


  • No labels