Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In view of this, we think the JM crash may cause great regression for batch jobs, especially long running batch jobs. This FLIP is mainly to solve this problem so that batch jobs can recover most job progress after JM crashes. In this FLIP, our goal is to let most finished tasks not need to be re-run.

Public Interfaces

Introduce new configuration options

We intend to introduce the following new configuration parameters.

operator-coordinator-

Key

Type

Default Value

Description

execution.batch.job-recovery.enabled

Boolean

false

A flag to enable or disable the job recovery

execution.batch.job-recovery.previous-worker.recovery.timeout

Duration

30 s

The timeout for a new job master to wait for the previous worker to reconnect

execution.batch.job-recovery.

snapshot.min-pause

Duration

3 min

The minimal pause between operator coordinator snapshots

job-event.store.write-buffer.size

MemorySize

1M

The size of the write buffer of JobEventStore, the content will be flushed to external file system once it's full

job-event.store.write-buffer.flush-interval

Duration

1 s

The flush interval of write buffer of JobEventStore, over this time, the content will be flushed to external file system

Introduce SupportsBatchSnapshot interface

In order to support Job Recovery, we will reuse the SplitEnumerator#snapshotState method to obtain the internal state of the split enumerator. In order to distinguish it from a normal checkpoint, we will pass -1(NO_CHECKPOINT) to identify that this is a snapshot for the no-checkpoint/batch scenarios. Some current split enumerator implementations may rely on an postive checkpoint id, or does not support taking snapshot in no-checkpoint/batch scenarios. Therefore, job recovery will be disabled unless the source developer explicitly indicates that it can support taking snapshot in no-checkpoint/batch scenarios. We introduced a new interface SupportsBatchSnapshot to do this.

Code Block
titleSupportsBatchSnapshot
/**
 * An identifier interface for {@link SplitEnumerator}, which is used to support taking snapshot in
 * no-checkpoint/batch scenarios (for example to support job recovery in batch jobs). Once a {@link
 * SplitEnumerator} implements this interface, it's {@link SplitEnumerator#snapshotState} method
 * needs to accept -1 (NO_CHECKPOINT) as argument.
 */
@PublicEvolving
public interface SupportsBatchSnapshot {}


Proposed Changes

The Whole Workflow of Job Recovery

...

  1. Initialize all ExecutionJobVertex whose parallelism has been decided. We can obtain the initialization information from the replayed events (ExecutionJobVertexInitializedEvent).
  2. According to the information in JobMasterPartitionTracker, the execution vertices whose produced partitions are all tracked will be marked as finished. 
  3. For execution vertices that are not marked as finished, as mentioned above, if its corresponding job vertex has operator coordinators, we need to call OperatorCoordinator#subtaskReset for them.
  4. Find all sink/leaf execution vertices in ExecutionGraph. For each sink/leaf execution vertex in the non-finish state, recursively find all its upstream vertices that need to be restarted (which are in unfinished state), and then start scheduling based on this.
Code Block
titleShuffleMaster
interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable {

...

    //… other methods  

...



    // other methods  

    /**

...


     * Get all partitions and their metrics, the metrics mainly includes the meta information of partition(partition bytes, etc).

...


     * @param jobId ID of the target job

...


     * @return All partitions belongs to the target job and their metrics

...


     */

...


    Collection<PartitionWithMetrics> getAllPartitionWithMetrics(JobID jobId);

...



    interface PartitionWithMetrics {

...



        ShuffleMetrics getPartitionMetrics();

...



        ShuffleDescriptor getPartition();

...


    }

...

    interface ShuffleMetrics {

...



    interface ShuffleMetrics {

        ResultPartitionBytes getPartitionBytes();

...

    }


    }
}

Compatibility, Deprecation, and Migration Plan

...