Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, when Flink’s job manager crashes or gets killed, possibly due to unexpected errors or planned nodes decommission, it will cause the following two situations:

  1. Failed, if the job does not enable HA.
  2. Restart, if the job enable HA. If it’s a streaming job, the job will be resumed from the last successful checkpoint. If it’s a batch job, it has to run from beginning, all previous progress will be lost.

In view of this, we think the JM crash may cause great regression for batch jobs, especially long running batch jobs. This FLIP is mainly to solve this problem so that batch jobs can recover most job progress after JM crashes. In this FLIP, our goal is to let most finished tasks not need to be re-run.

Public Interfaces

We intend to introduce the following new configuration parameters.

...

Key

...

Type

...

Default Value

...

Description

...

execution.batch.job-recovery.enabled

...

Boolean

...

false

...

A flag to enable or disable the job recovery

...

execution.batch.job-recovery.previous-worker.recovery.timeout

...

Duration

...

30 s

...

The timeout for a new job master to wait for the previous worker to reconnect

...

execution.batch.job-recovery.operator-coordinator-snapshot.min-pause

...

Duration

...

3 min

...

The minimal pause between operator coordinator snapshots

...

job-event.store.write-buffer.size

...

MemorySize

...

1M

...

The size of the write buffer of JobEventStore, the content will be flushed to external file system once it's full

...

job-event.store.write-buffer.flush-interval

...

Duration

...

1 s

...

The flush interval of write buffer of JobEventStore, over this time, the content will be flushed to external file system

In DataStream API, a DataStream can be transformed into a KeyedStream through keyBy, and then further transformed into a WindowedStream through window operations. WindowedStream enables window processing on records with the same key. With the improvement of FLIP-331, WindowedStream will support the full window processing by the window assigner EndOfStreamWindows for which the window is only triggered at the end of inputs.

However, full window processing is not supported directly by DataStream. This means the DataStream cannot collect all records of each subtask (these records have no keys) separately into a full window and process them at the end of inputs. DataSet API already supports processing and sorting all records within each subtask through mapPartition API and sortPartition API.  As DataSet API has been deprecated in Flink 1.18, it is necessary to enhance the DataStream to support handling full window processing on individual subtasks.

In this FLIP, we propose two main enhancements. Firstly, we propose enabling DataStream to directly transform into a PartitionWindowedStream. The PartitionWindowedStream represents collecting all records of each subtask separately into a full window. Secondly, we propose supporting four APIs on PartitionWindowedStream, including mapPartitionsortPartitionaggregate and reduce.  

Public Interfaces

1. We introduce the fullWindowPartition method to the DataStream class.

2. We introduce the PartitionWindowedStream that extends the DataStream. 

3. We add four APIs to PartitionWindowedStream,  including mapPartition, sortPartition, aggregate and reduce.

Proposed Changes

Why not support arbitrary window processing on DataStream

If DataStream supports arbitrary window types for processing data on each subtask, it would not only support full window but also other window types such as count windows, sliding windows, and session windows. However, we have chosen to support only full window processing on DataStream. The main reason is that the DataStream is non-keyed and does not support keyed statebackend and keyed raw state . This issue results in two conflicts on the usage of windows:

1. The storage of window state relies on the InternalKvState provided by KeyedStateBackend. The storage of timers in time service also relies on keyed raw state.

2. The recovery of window state and timers relies on the assumption that every record has a key. In fault tolerance and rescaling scenarios, each subtask must be assigned a key range to get the window state and timers correctly.

Proposed Changes

...



The whole workflow of job recovery, mainly includes 3 parts:

...

  1. Initialize all ExecutionJobVertex whose parallelism has been decided. We can obtain the initialization information from the replayed events (ExecutionJobVertexInitializedEvent).
  2. According to the information in JobMasterPartitionTracker, the execution vertices whose produced partitions are all tracked will be marked as finished. 
  3. For execution vertices that are not marked as finished, as mentioned above, if its corresponding job vertex has operator coordinators, we need to call OperatorCoordinator#subtaskReset for them.
  4. Find all sink/leaf execution vertices in ExecutionGraph. For each sink/leaf execution vertex in the non-finish state, recursively find all its upstream vertices that need to be restarted (which are in unfinished state), and then start scheduling based on this.

interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable {

    //… other methods  


    /**

     * Get all partitions and their metrics, the metrics mainly includes the meta information of partition(partition bytes, etc).

     * @param jobId ID of the target job

     * @return All partitions belongs to the target job and their metrics

     */

    Collection<PartitionWithMetrics> getAllPartitionWithMetrics(JobID jobId);


    interface PartitionWithMetrics {

        ShuffleMetrics getPartitionMetrics();


        ShuffleDescriptor getPartition();

    }


    interface ShuffleMetrics {

        ResultPartitionBytes getPartitionBytes();

    }

}

Compatibility, Deprecation, and Migration Plan

...