Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In view of this, we think the JM crash may cause great regression for batch jobs, especially long running batch jobs. This FLIP is mainly to solve this problem so that batch jobs can recover most job progress after JM crashes. In this FLIP, our goal is to let most finished tasks not need to be re-run.

Public Interfaces

Introduce new configuration options

We intend to introduce the following new configuration parameters.

Key

Type

Default Value

Description

execution.batch.job-recovery.enabled

Boolean

false

A flag to enable or disable the job recovery

execution.batch.job-recovery.previous-worker.recovery.timeout

Duration

30 s

The timeout for a new job master to wait for the previous worker to reconnect

execution.batch.job-recovery.snapshot.min-pause

Duration

3 min

The minimal pause between operator coordinator snapshots

job-event.store.write-buffer.size

MemorySize

1M

The size of the write buffer of JobEventStore, the content will be flushed to external file system once it's full

job-event.store.write-buffer.flush-interval

Duration

1 s

The flush interval of write buffer of JobEventStore, over this time, the content will be flushed to external file system

Introduce SupportsBatchSnapshot interface

In order to support Job Recovery, we will reuse the SplitEnumerator#snapshotState method to obtain the internal state of the split enumerator. In order to distinguish it from a normal checkpoint, we will pass -1(NO_CHECKPOINT) to identify that this is a snapshot for the no-checkpoint/batch scenarios. Some current split enumerator implementations may rely on an postive checkpoint id, or does not support taking snapshot in no-checkpoint/batch scenarios. Therefore, job recovery will be disabled unless the source developer explicitly indicates that it can support taking snapshot in no-checkpoint/batch scenarios. We introduced a new interface SupportsBatchSnapshot to do this.

...

Consider that the operator coordinators may have large state, we believe it may cause large overhead if we snapshot the operator coordinator at each execution vertex finished. To solve this problem, we will add a new configuration option "execution.batch.job-recovery.operator-coordinator-snapshot.min-pause" to control the minimum pause between snapshots. When restoring, we will also reconcile the execution job vertex state with the operator coordinator state to be consistent. In other words, we will adjust the execution job vertex to the state at the time of the lastest snapshot operator coordinators.

However, currently, some operator coordinators may not support taking snapshot in no-checkpoint/batch scenarios (or does not accept -1 as argument), we need to add the following method to the operator coordinator:

Code Block
titleOperatorCoordinator
public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {

    // other methods
    
    /**
     * Whether the operator coordinator supports taking snapshot in no-checkpoint/batch scenarios.
     * If it returns true, it's {@link OperatorCoordinator#checkpointCoordinator} and {@link
     * OperatorCoordinator#resetToCheckpoint} methods need to accept -1 (NO_CHECKPOINT) as the value
     * of checkpoint id. Otherwise returns false.
     */
    default boolean supportsBatchSnapshot() {
        return false;
    }


Method OperatorCoordinator#supportsBatchSnapshot is used to identify whether the current operator coordinator supports taking snapshot in no-checkpoint/batch scenarios. If an operator coordinator does not support snapshot, which means it will return the inital state after JM restarts, we will handle it as follows:
1. If not all tasks corresponding to this operator coordinator are finished, we will reset all these tasks and re-run them.
2. If all tasks corresponding to this operator coordinator finished, we don't need to do anything and the job can continue to run. However, once one of these tasks needs to be restarted at some point in the future (for example, due to PartitionNotFound exception), we need to reset all these tasks and re-run them.

Persistent JobEventStore

We intend to introduce a persistent JobEventStore to record the JobEvents, the store is based on the file system and has the following features:

...