Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


)
Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink./thread/074z237c07vtj74685nxo6bttkq3kshz
Vote threadhttps://lists.apache.org
Vote threadTBD
/thread/vkmghnohx3tl6h19of43hs75c9tnxh4w
JIRA

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33892

JIRA

TBD

Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Key

Type

Default Value

Description

execution.batch.job-recovery.enabled

Boolean

false

A flag to enable or disable the job recovery

execution.batch.job-recovery.previous-worker.recovery.timeout

Duration

30 s

The timeout for a new job master to wait for the previous worker to reconnect

execution.batch.job-recovery.snapshot.min-pause

Duration

3 min

The minimal pause between operator coordinator snapshotssnapshots. To avoid performance issues caused by too frequent snapshots. Used to take snapshots for operator coordinator or other components.

job-event.store.write-buffer.size

MemorySize

1M

The size of the write buffer of JobEventStore, the content will be flushed to external file system once it's full

job-event.store.write-buffer.flush-interval

Duration

1 s

The flush interval of write buffer of JobEventStore, over this time, the content will be flushed to external file system

...

In order to support Job Recovery, we will reuse the SplitEnumerator#snapshotState method to obtain the internal state of the split enumerator. In order to distinguish it from a normal checkpoint, we will pass -1(NO_CHECKPOINT) to identify indicate that this is a snapshot for the no-checkpoint/batch scenarios. Some current split enumerator implementations may rely on an postive checkpoint id, or does not support taking snapshot in no-checkpoint/batch scenarios. Therefore, job recovery will be disabled unless the source developer explicitly indicates specifies that it can support taking snapshot snapshots in no - checkpoint/batch scenarios. We introduced , the state of SourceCoordinator/SplitEnumerator will not be recored and restored(see the JobEvent section for how to handle this case). We will introduce a new interface SupportsBatchSnapshot to do this.indicate whether the split enumerator supports  taking snapshots in batch scenarios. 

Code Block
titleSupportsBatchSnapshot
/**
 * An identifier interface for {@link SplitEnumerator}, which is used to support taking snapshot in
 * no-checkpoint/batch scenarios (for example to support job recovery in batch jobs). Once a {@link
 * SplitEnumerator} implements this interface, it'sits {@link SplitEnumerator#snapshotState} method
 * needs to accept -1 (NO_CHECKPOINT) as the argument.
 */
@PublicEvolving
public interface SupportsBatchSnapshot {}

...

  1. ExecutionJobVertexInitializedEvent: This event is responsible for recording the initialization information of ExecutionJobVertex, its content contains the decided parallelism of this job vertex, and its input information. This event will be triggered and written out when an ExecutionJobVertex is initialized.

  2. ExecutionVertexFinishedEvent: This event is responsible for recording the information of finished task. Our goal is that all finished tasks don’t need to re-run, so the simple idea is to trigger an event when a task is finished.  The content of this event contains:

    1. The state of the finished task/ExecutionVertex, including IO metrics, accumulators, etc. These contents can be easily obtained from ExecutionGraph.

    2. If the job vertex which this task belongs to has operator coordinators, the states of the operator coordinators also need to be recorded. See section OperatorCoordinator for details.
    3. The state of shuffle master, see section ShuffleMaster for details.

OperatorCoordinator

In order to obtain the state of operator coordinators, we will enrich the OperatorCoordinator#checkpointCoordinator method to let it accept  -1 (NO_CHECKPOINT) as the value of checkpointId, to support snapshotting the state of operator coordinator in batch jobs. After JM crashes, the operator coordinator can be restored from the previous recorded state. In addition to a simple restore(by OperatorCoordinator#resetToCheckpoint method), it also needs to call OperatorCoordinator#subtaskReset for the non-finished tasks (which may in running state before JM crashes) , because these tasks will be reset and re-run after JM crashes.

...

For the source coordinator, whether it supports batch snapshot depends on whether it's SplitEnumerator supports batch snapshot. For it, we introduce the SupportsBatchSnapshot mentioned above to identify it.

ShuffleMaster

In order to obtain the state of shuffle master, we will add the following methods to shuffle master. Before the job starts running, we will check whether the shuffle master supports taking snapshots(through method supportsBatchSnapshot). If it is not supported, we will disable Job Recovery for jobs. 

Note that it's mainly designed for external/remote shuffle service, the Flink default Netty/TM shuffle is stateless and only needs to be an empty implementation.

Code Block
titleShuffleMaster
interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable {

    // other methods

    /**
     * Whether the shuffle master supports taking snapshot in batch scenarios, which will be used
     * when enable Job Recovery. If it returns true, we will call {@link #snapshotState} to take
     * snapshot, and call {@link #restoreState} to restore the state of shuffle master.
     */
    default boolean supportsBatchSnapshot() {
        return false;
    }
 
    default void snapshotState(
            CompletableFuture<ShuffleMasterSnapshot> snapshotFuture,
            ShuffleMasterSnapshotContext context) {
        snapshotFuture.complete(null);
    }
 
    default void restoreState(List<ShuffleMasterSnapshot> snapshots) {} 
}

 /**
 * This class represents a snapshot of shuffle master, which can be used to restore the internal
 * state of the shuffle master. It is full or incremental.
 */
interface ShuffleMasterSnapshot extends Serializable {

    /** Returns whether the snapshot is incremental. */
    boolean isIncremental();
}

/**
 * Snapshot context used to create {@link ShuffleMasterSnapshot}, which can provide necessary
 * informations.
 */
interface ShuffleMasterSnapshotContext {
}

Persistent JobEventStore

We intend to introduce a persistent JobEventStore to record the JobEvents, the store is based on the file system and has the following features:

...

Code Block
titleShuffleMaster
interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable {

    // other methods  

    /**
     * Get all partitions and their metrics, the metrics mainly includes the meta information of partition(partition bytes, etc).
     * @param jobId ID of the target job @param jobId ID of the target job
     * @return All partitions belongs to the target job and their metrics @return All partitions belongs to the target job and their metrics
     */
    Collection<PartitionWithMetrics> getAllPartitionWithMetrics(JobID jobId    CompletableFuture<Collection<PartitionWithMetrics>> getAllPartitionWithMetrics(JobID jobId);

    interface PartitionWithMetrics     interface PartitionWithMetrics {

        ShuffleMetrics        ShuffleMetrics getPartitionMetricsgetPartitionMetrics();

        ShuffleDescriptor        ShuffleDescriptor getPartitiongetPartition();
    }

    interface ShuffleMetrics    interface ShuffleMetrics {

        ResultPartitionBytes        ResultPartitionBytes getPartitionBytesgetPartitionBytes();
    }
}

Compatibility, Deprecation, and Migration Plan

...