Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


)Vote Release
Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flinkthread/074z237c07vtj74685nxo6bttkq3kshz
Vote threadhttps://lists.apache.org/thread/vkmghnohx3tl6h19of43hs75c9tnxh4wTBD
JIRAJIRA

Jira
TBD
server

ASF JIRA

<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when Flink’s job manager crashes or gets killed, possibly due to unexpected errors or planned nodes decommission, it will cause the following two situations:

  1. Failed, if the job does not enable HA.
  2. Restart, if the job enable HA. If it’s a streaming job, the job will be resumed from the last successful checkpoint. If it’s a batch job, it has to run from beginning, all previous progress will be lost.

columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33892

Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when Flink’s job manager crashes or gets killed, possibly due to unexpected errors or planned nodes decommission, it will cause the following two situations:

  1. Failed, if the job does not enable HA.
  2. Restart, if the job enable HA. If it’s a streaming job, the job will be resumed from the last successful checkpoint. If it’s a batch job, it has to run from beginning, all previous progress will be lost.

In In view of this, we think the JM crash may cause great regression for batch jobs, especially long running batch jobs. This FLIP is mainly to solve this problem so that batch jobs can recover most job progress after JM crashes. In this FLIP, our goal is to let most finished tasks not need to be re-run.

...


Method OperatorCoordinator#supportsBatchSnapshot is used to identify whether the current operator coordinator supports taking snapshot in no-checkpoint/batch scenarios. If an operator coordinator does not support snapshot, which means it will return the inital state after JM restarts, we will handle it as follows:
1. If not , we will handle it as follows:
1. If not all tasks corresponding to this operator coordinator are finished, we will reset all these tasks and re-run them.
2. If all tasks corresponding to this operator coordinator are finished, we will finished, we don't need to do anything and the job can continue to run. However, once one of these tasks needs to be restarted at some point in the future (for example, due to PartitionNotFound exception), we need to reset all these tasks and re-run them.
2. If all tasks corresponding to this operator coordinator finished, we don't need to do anything and the job can continue to run. However, once one of these tasks needs to be restarted at some point in the future (for example, due to PartitionNotFound exception), we need to reset all these tasks and re-run them.

For the source coordinator, whether it supports batch snapshot depends on whether it's SplitEnumerator supports batch snapshot. For it, we introduce the SupportsBatchSnapshot mentioned above to identify it.

ShuffleMaster

In order to obtain the state of shuffle master, we will add the following methods to shuffle master. Before the job starts running, we will check whether the shuffle master supports taking snapshots(through method supportsBatchSnapshot). If it is not supported, we will disable Job Recovery for jobs. 

Note that it's mainly designed for external/remote shuffle service, the Flink default Netty/TM shuffle is stateless and only needs to be an empty implementation.

For the source coordinator, whether it supports batch snapshot depends on whether it's SplitEnumerator supports batch snapshot. For it, we introduce the SupportsBatchSnapshot mentioned above to identify it.

ShuffleMaster

In order to obtain the state of shuffle master, we will add the following methods to shuffle master. Before the job starts running, we will check whether the shuffle master supports taking snapshots(through method supportsBatchSnapshot). If it is not supported, we will disable Job Recovery for jobs. 

Note that it's mainly designed for external/remote shuffle service, the Flink default Netty/TM shuffle is stateless and only needs to be an empty implementation.

Code Block
titleShuffleMaster
interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable {

    // other methods

    /**
     * Whether the shuffle master supports taking snapshot in batch scenarios, which will be used
     * when enable Job Recovery. If it returns true, we will call {@link #snapshotState} to take
     * snapshot, and call {@link #restoreState} to restore the state of shuffle master.
     */
    default boolean supportsBatchSnapshot() {
        return false;
    }
 
    default void snapshotState(
            CompletableFuture<ShuffleMasterSnapshot> snapshotFuture,
            ShuffleMasterSnapshotContext context) {
        snapshotFuture.complete(null);
    }
 
    default void restoreState(List<ShuffleMasterSnapshot> snapshots) {} 
}

 /**
 * This class represents a snapshot of shuffle master, which can be used to restore the internal
 * state of the shuffle master. It is full or incremental.
 */
interface ShuffleMasterSnapshot extends Serializable {

    /** Returns whether the snapshot is incremental. */
    boolean isIncremental();
}

/**
 * Snapshot context used to create {@link ShuffleMasterSnapshot}, which can provide necessary
 * informations.
 */
Code Block
titleShuffleMaster
interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable {

    // other methods

    /**
     * Whether the shuffle master supports taking snapshot in batch scenarios, which will be used
     * when enable Job Recovery. If it returns true, we will call {@link #snapshotState} to take
     * snapshot, and call {@link #restoreState} to restore the state of shuffle master.
     */
    default boolean supportsBatchSnapshot() {
        return false;
    }
 
    default void snapshotState(
            CompletableFuture<ShuffleMasterSnapshot> snapshotFuture,
            ShuffleMasterSnapshotContext context) {
        snapshotFuture.complete(null);
    }
 
    default void restoreState(List<ShuffleMasterSnapshot> snapshots) {} 
}

interface ShuffleMasterSnapshot extends Serializable {
    boolean isIncremental();
}

interface ShuffleMasterSnapshotContext {
}

...

  1. Initialize all ExecutionJobVertex whose parallelism has been decided. We can obtain the initialization information from the replayed events (ExecutionJobVertexInitializedEvent).
  2. According to the information in JobMasterPartitionTracker, the execution vertices whose produced partitions are all tracked will be marked as finished. 
  3. For execution vertices that are not marked as finished, as mentioned above, if its corresponding job vertex has operator coordinators, we need to call OperatorCoordinator#subtaskReset for them.
  4. Find all sink/leaf execution vertices in ExecutionGraph. For each sink/leaf execution vertex in the non-finish state, recursively find all its upstream vertices that need to be restarted (which are in unfinished state), and then start scheduling based on this.
Code Block
titleShuffleMaster
titleShuffleMaster
interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable {

    // other methods  

    /**
     * Get all partitions and their metrics, the metrics mainly includes the meta information of partition(partition bytes, etc).
     * @param jobId ID of the target job
     * @return All partitions belongs to the target job and their metrics meta information of partition(partition bytes, etc).
     * @param jobId ID of the target job
     * @return All partitions belongs to the target job and their metrics
     */
    Collection<PartitionWithMetrics> getAllPartitionWithMetrics(JobID jobId    CompletableFuture<Collection<PartitionWithMetrics>> getAllPartitionWithMetrics(JobID jobId);

    interface PartitionWithMetrics     interface PartitionWithMetrics {

        ShuffleMetrics getPartitionMetrics       ShuffleMetrics getPartitionMetrics();

        ShuffleDescriptor        ShuffleDescriptor getPartitiongetPartition();
    }

    interface ShuffleMetrics    interface ShuffleMetrics {

        ResultPartitionBytes getPartitionBytes       ResultPartitionBytes getPartitionBytes();
    }
}

Compatibility, Deprecation, and Migration Plan

...