Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We intend to introduce the following new configuration parameters.

Key

Type

Default Value

Description

execution.batch.job-recovery.enabled

Boolean

false

A flag to enable or disable the job recovery.

execution.batch.job-recovery.previous-worker.recovery.timeout

Duration

30 s

The timeout for a new job master to wait for the previous worker to reconnect.

execution.batch.job-recovery.operator-coordinator-snapshot.min-pause

Duration

3 min

The minimal pause between operator coordinator snapshots. 

job-event.store.write-buffer.size

MemorySize

1M

The size of the write buffer of JobEventStore, the content will be flushed to external file system once it's full

job-event.store.write-buffer.flush-interval

Duration

1 s

The flush interval of write buffer of JobEventStore, over this time, the content will be flushed to external file system.

Proposed Changes

The Whole Workflow of Job Recovery

...

  1. Initialize all ExecutionJobVertex whose parallelism has been decided. We can obtain the initialization information from the replayed events (ExecutionJobVertexInitializedEvent).
  2. According to the information in JobMasterPartitionTracker, the execution vertices whose produced partitions are all tracked will be marked as finished. 
  3. For execution vertices that are not marked as finished, as mentioned above, if its corresponding job vertex has operator coordinators, we need to call subtaskReset for them.
  4. Find all sink/leaf execution vertices in ExecutionGraph. For each sink/leaf execution vertex in the non-finish state, recursively find all its upstream vertices that need to be restarted (which are in unfinished state), and then start scheduling based on this.

interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable {

    //… other methods  


    /**

     * Get all partitions and their metrics, the metrics mainly includes the meta information of partition(partition bytes, etc).

     * @param jobId ID of the target job

     * @return All partitions belongs to the target job and their metrics

     */

    Collection<PartitionWithMetrics> getAllPartitionWithMetrics(JobID jobId);


    interface PartitionWithMetrics {

        ShuffleMetrics getPartitionMetrics();


        ShuffleDescriptor getPartition();

    }


    interface ShuffleMetrics {

        ResultPartitionBytes getPartitionBytes();

    }

}

Compatibility, Deprecation, and Migration Plan

...