Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion threadhere (<- link to to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadTBD
JIRA

TBD

Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In DataStream API, a DataStream can be transformed into a KeyedStream through keyBy, and then further transformed into a WindowedStream through window operations. WindowedStream enables window processing on records with the same key. With the improvement of FLIP-331, WindowedStream will support the full window processing by the window assigner EndOfStreamWindows for which the window is only triggered at the end of inputs.

However, full window processing is not supported directly by DataStream. This means the DataStream cannot collect all records of each subtask (these records have no keys) separately into a full window and process them at the end of inputs. DataSet API already supports processing and sorting all records within each subtask through mapPartition API and sortPartition API.  As DataSet API has been deprecated in Flink 1.18, it is necessary to enhance the DataStream to support handling full window processing on individual subtasks.

In this FLIP, we propose two main enhancements. Firstly, we propose enabling DataStream to directly transform into a PartitionWindowedStream. The PartitionWindowedStream represents collecting all records of each subtask separately into a full window. Secondly Secondly, we propose supporting four APIs on PartitionWindowedStream, including mapPartitionsortPartitionaggregate and reduce.  

Public Interfaces

1. We introduce the fullWindowPartition method to the DataStream class.

2. We introduce the PartitionWindowedStream that extends the DataStream. 

3. We add four APIs to PartitionWindowedStream,  including mapPartition, sortPartition, aggregate and reduce.

Proposed Changes

1. Why not support arbitrary window processing on DataStream

If DataStream supports arbitrary window types for processing data on each subtask, it would not only support full window but also other window types such as count windows, sliding windows, and session windows. However, we have chosen to support only full window processing on DataStream. The main reason is that the DataStream is non-keyed and does not support keyed statebackend and keyed raw state . This issue results in two conflicts on the usage of windows:

1. The storage of window state relies on the InternalKvState provided by KeyedStateBackend. The storage of timers in time service also relies on keyed raw state.

2. The recovery of window state and timers relies on the assumption that every record has a key. The window state and timers will be stored and distinguished by the key. In fault tolerance and rescaling scenarios, each subtask must be assigned a key range to get recover the window state and timers correctly.

The whole workflow of job recovery, mainly includes 3 parts:

  1. During the normal execution, Flink will record states of JM (ExecutionGraph, OperatorCoordinator, etc) to persistent storage so that we can recover based on these states after JM crash. We will introduce an event-based method to record the state of JM.
  2. During the JM crashes and restarts (generally HA will be responsible for restarting JM), the shuffle service and TM will retain the partitions related to the target job and try to continuously reconnect.
  3. After JM restarts, the connection with shuffle service and TM will be re-established. Then JM will recover the job progress based on the previous recorded states and partitions currently existing in the cluster, and restart the scheduling.

Record states during normal execution

In order to recover the previous state after JM crash, we need to record the state of JM. We will introduce an event-based method to record the state of JM, each event(JobEvent) records the increment of the JM state. After JM crashes, we can recover the JM state by replaying the JobEvents.

JobEvent

We will record 2 kind of JobEvents:

  1. ExecutionJobVertexInitializedEvent: This event is responsible for recording the initialization information of ExecutionJobVertex, its content contains the decided parallelism of this job vertex, and its input information. This event will be triggered and written out when an ExecutionJobVertex is initialized.

  2. ExecutionVertexFinishedEvent: This event is responsible for recording the information of finished task. Our goal is that all finished tasks don’t need to re-run, so the simple idea is to trigger an event when a task is finished.  The content of this event contains:

    1. The state of the finished task/ExecutionVertex, including IO metrics, accumulators, etc. These contents can be easily obtained from ExecutionGraph.

    2. If the job vertex which this task belongs to has operator coordinators, the states of the operator coordinators also need to be recorded.

In order to obtain the state of operator coordinators, we will enrich the OperatorCoordinator#checkpointCoordinator method to let it accept  -1 (NO_CHECKPOINT) as the value of checkpointId, to support snapshotting the state of operator coordinator in batch jobs. After JM crashes, the operator coordinator can be restored from the previous recorded state. In addition to a simple restore(by OperatorCoordinator#resetToCheckpoint method), it also needs to call OperatorCoordinator#subtaskReset for the non-finished tasks (which may in running state before JM crashes) , because these tasks will be reset and re-run after JM crashes.

Consider that the operator coordinators may have large state, we believe it may cause large overhead if we snapshot the operator coordinator at each execution vertex finished. To solve this problem, we will add a new configuration option "execution.batch.job-recovery.operator-coordinator-snapshot.min-pause" to control the minimum pause between snapshots. When restoring, we will also reconcile the execution job vertex state with the operator coordinator state to be consistent. In other words, we will adjust the execution job vertex to the state at the time of the lastest snapshot operator coordinators.

Persistent JobEventStore

We intend to introduce a persistent JobEventStore to record the JobEvents, the store is based on the file system and has the following features:

  1. To avoid IO operations blocking the JM main thread, the JobEventStore will write each event out in an asynchronous thread.
  2. To avoid frequent IO operations causing great pressure on external file system, there will be a write buffer inside the JobEventStore. The JobEvents will be written to the buffer first, and then flushed to external file system when the buffer is full or the flush time is reached. The flush frequency will be controlled by the following 2 configuration options:
    1. job-event.store.write-buffer.size: The size of the write buffer, the content will be flushed to external file system once it's full.
    2. job-event.store.write-buffer.flush-interval: The flush interval of write buffer, over this time, the content will be flushed to external file system.

Reconnection

Currently, when a JM crashes, TM will sense it through HA or heartbeat timeout, and then it will release all resources related to this job (including slots, and partitions when using TM shuffle), and then disconnect from JM.

To support job recovery, we need do the following changes:

  1. When it is found that the JM lost, TM will fail all tasks belongs to the target job and and release the corresponding slots.
  2. If there are partitions belongs the target job on TM, the TM should retain the partitions, and wait for HA to notify the new JM and try to establish connection to the JM. We need to register a timeout for waiting, and release the partition after the timeout. We can reuse the existing configuration option “taskmanager.registration.timeout” here, the default value is 5 minutes.
  3. If there is no partitions belongs the target job on TM, keep the same logic as current.

If it‘s using other external shuffle services, it should be the same as TM shuffle, when it detects JM crash, it should retain the partitions and wait the JM to reconnect.

Re-schedule after JM restart

After JM restarts and becomes leader again, it will wait for a period of time for the TMs to re-establish connection with itself. The length of waiting time is controlled by "execution.batch.job-recovery.previous-worker.recovery.timeout", only TMs connected within this period will be accepted, and those that time out will be rejected. Once we have enough partitions (all partitions required to continue running are registered), we can end this wait early and continue to the next step.

After re-establish connection with TMs, JM will try to obtain all partitions existing in cluster through ShuffleMaster, and re-establish the partition information in JobMasterPartitionTracker. To do that, we need to add a new method getAllPartitionWithMetrics to ShuffleMaster.

After re-establish JobMasterPartitionTracker, JM begins to replay the JobEvents from the JobEventStore, recover the execution graph state, and then starts rescheduling based on the execution graph state and the partitions currently existing in the cluster:

  1. Initialize all ExecutionJobVertex whose parallelism has been decided. We can obtain the initialization information from the replayed events (ExecutionJobVertexInitializedEvent).
  2. According to the information in JobMasterPartitionTracker, the execution vertices whose produced partitions are all tracked will be marked as finished. 
  3. For execution vertices that are not marked as finished, as mentioned above, if its corresponding job vertex has operator coordinators, we need to call OperatorCoordinator#subtaskReset for them.
  4. Find all sink/leaf execution vertices in ExecutionGraph. For each sink/leaf execution vertex in the non-finish state, recursively find all its upstream vertices that need to be restarted (which are in unfinished state), and then start scheduling based on this.

interface ShuffleMaster<T extends ShuffleDescriptor> extends AutoCloseable {

    //… other methods  

    /**

     * Get all partitions and their metrics, the metrics mainly includes the meta information of partition(partition bytes, etc).

     * @param jobId ID of the target job

     * @return All partitions belongs to the target job and their metrics

     */

    Collection<PartitionWithMetrics> getAllPartitionWithMetrics(JobID jobId);

    interface PartitionWithMetrics {

        ShuffleMetrics getPartitionMetrics();

        ShuffleDescriptor getPartition();

    }

    interface ShuffleMetrics {

        ResultPartitionBytes getPartitionBytes();

    }

}

Therefore, we propose to support only full window processing on DataStream. This feature is designed to work only in batch mode and does not support checkpointing. If user specifies the stream mode, the job will be failed to submit. Given this premise, the underlying implementation can avoid the aforementioned conflict issues by not relying on keyed state.

The Definition Of PartitionWindowedStream

We define the PartitionWindowedStream as the result from transforming in a manner similar to dataStream.window(EndOfWindows.get()).  We add the fullWindowPartition method in DataStream class to replace the step of window(EndOfWindows.get()) and return the PartitionWindowedStream in the method.

The PartitionWindowedStream does not support setting the parallelism and has the same parallelism with previous operator.

The PartitionWindowedStream class will be annotated by @PublicEnvolving as more APIs may be added in the future.

Here is the fullWindowPartition method and PartitionWindowedStream class.

Code Block
languagejava
public class DataStream<T> {
  
  ...
  
  /**
   * Collect records in each subtask of this data stream separately into a full 
   * window. The window emission will be triggered at the end of inputs.
   *
   * @return The data stream full windowed on each subtask.
   */
  public PartitionWindowedStream<T> fullWindowPartition() {
    ...
  }
}


Code Block
languagejava
/**
 * {@link PartitionWindowedStream} represents a data stream that collects all 
 * records of each subtask separately into a full window. Window emission will 
 * be triggered at the end of inputs.
 *
 * @param <T> The type of the elements in this stream.
 */
public class PartitionWindowedStream<T> extends DataStream<T> {
  ...
}

API Implementation

MapPartition

We introduce the mapPartition API  in the PartitionWindowedStream.

Code Block
languagejava
public class PartitionWindowedStream<T> extends DataStream<T> {

    /**
     * Process the records of the window by {@link MapPartitionFunction}. The 
     * records will be available in the given {@link Iterator} function 
     * parameter of {@link MapPartitionFunction}.
     *
     * @param mapPartitionFunction The {@link MapPartitionFunction} that is 
     * called for the records in the full window.
     * @return The resulting data stream.
     * @param <R> The type of the elements in the resulting stream, equal to the
     *     MapPartitionFunction's result type.
     */
    public <R> DataStream<R> mapPartition(MapPartitionFunction<T, R> mapPartitionFunction) {
      ...
    }
}

We also show the definition of MapPartitionFunction.

Code Block
languagejava
public interface MapPartitionFunction<T, O> extends Function, Serializable {

    /**
     * A user-implemented function that modifies or transforms an incoming 
     * object.
     *
     * @param values All records for the mapper
     * @param out The collector to hand results to.
     * @throws Exception This method may throw exceptions. Throwing an 
     * exception will cause the operation to fail and may trigger recovery.
     */
    void mapPartition(Iterable<T> values, Collector<O> out) throws Exception;
}

Next, we will propose the operator implementation for the mapPartition API . 

In our implementation, the operator will execute the MapPartitionFunction while receiving records, rather than waiting for the entire window of records to be collected. To achieve this, we add a seperate UDFExecutionThread inside the operator.

The TaskMainThread will cyclically add records to a fixed-size queue. The UDFExecutionThread will invoke user-defined MapPartitionFunction and cyclically poll records from the queue in the Iterator parameter of MapPartitionFunction. If there is no records in the queue, the UDFExecutionThread blocks and waits on the hasNext() and next() methods of the Iterator. Once the UDFExecutionThread has processed all the data, the operator completes its execution. 

The following diagram illustrates the interaction between the TaskMainThread and the UDFExecutionThread:

Image Added

SortPartition

We introduce the sortPartition API  in the PartitionWindowedStream, including three methods to sort records by different key extraction logics.

Code Block
languagejava
public class PartitionWindowedStream<T> extends DataStream<T> {

    /**
     * Sorts the records of the window on the specified field in the 
     * specified order. The type of records must be {@link Tuple}.
     *
     * @param field The field index on which records is sorted.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public DataStream<T> sortPartition(int field, Order order) {
      ...
    }

    /**
     * Sorts the records of the window on the specified field in the 
     * specified order. The type of records must be {@link Tuple} or POJO 
     * class. The POJO class must be public and have getter and setter methods 
     * for each field. It mustn't implement any interfaces or extend any 
     * classes.
     * 
     * @param field The field expression referring to the field on which 
     * records is sorted.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public DataStream<T> sortPartition(String field, Order order) {
      ...
    }

    /**
     * Sorts the records of the window on the extracted key in the specified order.
     *
     * @param keySelector The KeySelector function which extracts the key 
     * from records.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public <K> DataStream<T> sortPartition(KeySelector<T, K> keySelector, Order order) {
      ...
    }
  
}

We also show the definition of Order, which defines the rules for sorting the records

Code Block
languagejava
public enum Order {
    /** Indicates an ascending order. */
    ASCENDING,

    /** Indicates a descending order. */
    DESCENDING
}

Next, we will propose the operator implementation for the sortPartition API. 

The TaskMainThread  will add records to the ExternalSorter, which is a multi-way merge sorter for sorting large amounts of data that cannot totally fit into memory. The ExternalSorter will sort the records according to the Order and send the sorted records to output at the end of inputs. The following diagram illustrates the interaction between the TaskMainThread and the ExternalSorter:

Image Added

Aggregate

We introduce the aggregate API  in the PartitionWindowedStream.

Code Block
languagejava
public class PartitionWindowedStream<T> extends DataStream<T> {
    /**
     * Applies the given aggregate function to the records of the window. The 
     * aggregate function is called for each element, aggregating values 
     * incrementally in the window.
     *
     * @param aggregateFunction The aggregation function.
     * @return The resulting data stream.
     * @param <ACC> The type of the AggregateFunction's accumulator.
     * @param <R> The type of the elements in the resulting stream, equal to 
     * the AggregateFunction's result type.
     */
    public <ACC, R> DataStream<R> aggregate(AggregateFunction<T, ACC, R> aggregateFunction) {
      ...
    }
  
}

Next, we will propose the operator implementation for the aggregate API. 

The TaskMainThread  will first create a accumulator by invoking the AggregateFunction#createAccumulator, and then compute each record by the accumulator in AggregateFunction#add. At the end of inputs, the TaskMainThread will get the result from the accumulator and send it to output.



Page properties


Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadTBD
JIRA

TBD

Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In DataStream API, a DataStream can be transformed into a KeyedStream through keyBy, and then further transformed into a WindowedStream through window operations. WindowedStream enables window processing on records with the same key. With the improvement of FLIP-331, WindowedStream will support the full window processing by the window assigner EndOfStreamWindows for which the window is only triggered at the end of inputs.

However, full window processing is not supported directly by DataStream. This means the DataStream cannot collect all records of each subtask (these records have no keys) separately into a full window and process them at the end of inputs. DataSet API already supports processing and sorting all records within each subtask through mapPartition API and sortPartition API.  As DataSet API has been deprecated in Flink 1.18, it is necessary to enhance the DataStream to support handling full window processing on individual subtasks.

In this FLIP, we propose two main enhancements. Firstly, we propose enabling DataStream to directly transform into a PartitionWindowedStream. The PartitionWindowedStream represents collecting all records of each subtask separately into a full window. Secondly, we propose supporting four APIs on PartitionWindowedStream, including mapPartitionsortPartitionaggregate and reduce.  

Public Interfaces

1. We introduce the fullWindowPartition method to the DataStream class.

2. We introduce the PartitionWindowedStream that extends the DataStream. 

3. We add four APIs to PartitionWindowedStream,  including mapPartition, sortPartition, aggregate and reduce.

Proposed Changes

If DataStream supports arbitrary window types for processing data on each subtask, it would not only support full window but also other window types such as count windows, sliding windows, and session windows. However, we have chosen to support only full window processing on DataStream. The main reason is that the DataStream is non-keyed and does not support keyed statebackend and keyed raw state . This issue results in two conflicts on the usage of windows:

1. The storage of window state relies on the InternalKvState provided by KeyedStateBackend. The storage of timers in time service also relies on keyed raw state.

2. The recovery of window state and timers relies on the assumption that every record has a key. The window state and timers will be stored and distinguished by the key. In fault tolerance and rescaling scenarios, each subtask must be assigned a key range to recover the window state and timers correctly.

Therefore, we propose to support only full window processing on DataStream. This feature is designed to work only in batch mode and does not support checkpointing. If user specifies the stream mode, the job will be failed to submit. Given this premise, the underlying implementation can avoid the aforementioned conflict issues by not relying on keyed state.

The Definition Of PartitionWindowedStream

We define the PartitionWindowedStream as the result from transforming in a manner similar to dataStream.window(EndOfWindows.get()).  We add the fullWindowPartition method in DataStream class to replace the step of window(EndOfWindows.get()) and return the PartitionWindowedStream in the method.

The PartitionWindowedStream does not support setting the parallelism and has the same parallelism with previous operator.

The PartitionWindowedStream class will be annotated by @PublicEnvolving as more APIs may be added in the future.

Here is the fullWindowPartition method and PartitionWindowedStream class.

Code Block
languagejava
public class DataStream<T> {
  
  ...
  
  /**
   * Collect records in each subtask of this data stream separately into a full 
   * window. The window emission will be triggered at the end of inputs.
   *
   * @return The data stream full windowed on each subtask.
   */
  public PartitionWindowedStream<T> fullWindowPartition() {
    ...
  }
}


Code Block
languagejava
/**
 * {@link PartitionWindowedStream} represents a data stream that collects all 
 * records of each subtask separately into a full window. Window emission will 
 * be triggered at the end of inputs.
 *
 * @param <T> The type of the elements in this stream.
 */
public class PartitionWindowedStream<T> extends DataStream<T> {
  ...
}

API Implementation

MapPartition

We introduce the mapPartition API  in the PartitionWindowedStream.

Code Block
languagejava
public class PartitionWindowedStream<T> extends DataStream<T> {

    /**
     * Process the records of the window by {@link MapPartitionFunction}. The 
     * records will be available in the given {@link Iterator} function 
     * parameter of {@link MapPartitionFunction}.
     *
     * @param mapPartitionFunction The {@link MapPartitionFunction} that is 
     * called for the records in the full window.
     * @return The resulting data stream.
     * @param <R> The type of the elements in the resulting stream, equal to the
     *     MapPartitionFunction's result type.
     */
    public <R> DataStream<R> mapPartition(MapPartitionFunction<T, R> mapPartitionFunction) {
      ...
    }
}

We also show the definition of MapPartitionFunction.

Code Block
languagejava
public interface MapPartitionFunction<T, O> extends Function, Serializable {

    /**
     * A user-implemented function that modifies or transforms an incoming 
     * object.
     *
     * @param values All records for the mapper
     * @param out The collector to hand results to.
     * @throws Exception This method may throw exceptions. Throwing an 
     * exception will cause the operation to fail and may trigger recovery.
     */
    void mapPartition(Iterable<T> values, Collector<O> out) throws Exception;
}

Next, we will propose the operator implementation for the mapPartition API . 

In our implementation, the operator will execute the MapPartitionFunction while receiving records, rather than waiting for the entire window of records to be collected. To achieve this, we add a seperate UDFExecutionThread inside the operator.

The TaskMainThread will cyclically add records to a fixed-size queue. The UDFExecutionThread will invoke user-defined MapPartitionFunction and cyclically poll records from the queue in the Iterator parameter of MapPartitionFunction. If there is no records in the queue, the UDFExecutionThread blocks and waits on the hasNext() and next() methods of the Iterator. Once the UDFExecutionThread has processed all the data, the operator completes its execution. 

The following diagram illustrates the interaction between the TaskMainThread and the UDFExecutionThread:

Image Added

SortPartition

We introduce the sortPartition API  in the PartitionWindowedStream, including three methods to sort records by different key extraction logics.

Code Block
languagejava
public class PartitionWindowedStream<T> extends DataStream<T> {

    /**
     * Sorts the records of the window on the specified field in the 
     * specified order. The type of records must be {@link Tuple}.
     *
     * @param field The field index on which records is sorted.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public DataStream<T> sortPartition(int field, Order order) {
      ...
    }

    /**
     * Sorts the records of the window on the specified field in the 
     * specified order. The type of records must be {@link Tuple} or POJO 
     * class. The POJO class must be public and have getter and setter methods 
     * for each field. It mustn't implement any interfaces or extend any 
     * classes.
     * 
     * @param field The field expression referring to the field on which 
     * records is sorted.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public DataStream<T> sortPartition(String field, Order order) {
      ...
    }

    /**
     * Sorts the records of the window on the extracted key in the specified order.
     *
     * @param keySelector The KeySelector function which extracts the key 
     * from records.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public <K> DataStream<T> sortPartition(KeySelector<T, K> keySelector, Order order) {
      ...
    }
  
}

We also show the definition of Order, which defines the rules for sorting the records

Code Block
languagejava
public enum Order {
    /** Indicates an ascending order. */
    ASCENDING,

    /** Indicates a descending order. */
    DESCENDING
}

Next, we will propose the operator implementation for the sortPartition API. 

The TaskMainThread  will add records to the ExternalSorter, which is a multi-way merge sorter for sorting large amounts of data that cannot totally fit into memory. The ExternalSorter will sort the records according to the Order and send the sorted records to output at the end of inputs. The following diagram illustrates the interaction between the TaskMainThread and the ExternalSorter:

Image Added

Aggregate

We introduce the aggregate API  in the PartitionWindowedStream.

Code Block
languagejava
public class PartitionWindowedStream<T> extends DataStream<T> {
    /**
     * Applies the given aggregate function to the records of the window. The 
     * aggregate function is called for each element, aggregating values 
     * incrementally in the window.
     *
     * @param aggregateFunction The aggregation function.
     * @return The resulting data stream.
     * @param <ACC> The type of the AggregateFunction's accumulator.
     * @param <R> The type of the elements in the resulting stream, equal to 
     * the AggregateFunction's result type.
     */
    public <ACC, R> DataStream<R> aggregate(AggregateFunction<T, ACC, R> aggregateFunction) {
      ...
    }
  
}

Next, we will propose the operator implementation for the aggregate API. 

The TaskMainThread  will first create a accumulator by invoking the AggregateFunction#createAccumulator, and then compute each record by the accumulator in AggregateFunction#add. At the end of inputs, the TaskMainThread will get the result from the accumulator and send it to output.

Reduce

We introduce the reduce API  in the PartitionWindowedStream.

Code Block
languagejava
public class PartitionWindowedStream<T> extends DataStream<T> {

    /**
     * Applies a reduce transformation on the records of the window. The 
     * {@link ReduceFunction} will be called for every record in the window.
     *
     * @param reduceFunction The reduce function.
     * @return The resulting data stream.
     */
    public DataStream<T> reduce(ReduceFunction<T> reduceFunction) {
      ...
    }
}

Next, we will propose the operator implementation for the reduce API. 

The TaskMainThread will invoke ReduceFunction#reduce for each record in the window and only send the final result to output at the end of inputs.


Rejected Alternatives

1. In the implementation of mapPartition API, make operator do not cache records. The UDFExecutionThread must execute synchronously with TaskMainThread to get the next record and process it in MapPartitionFunction.

We choose not to use this approach because it will result in inefficient execution due to frequent thread context switching between TaskMainThread and UDFExecutionThread. Caching records enables concurrent execution of the two threads and reduces the frequency of context switching.

Compatibility, Deprecation, and Migration Plan

This FLIP proposes a new feature in Flink. It is fully backward compatible.

Test Plan

We will provide unit and integration tests to validate the proposed changes

Compatibility, Deprecation, and Migration Plan

In the first version, the job recovery will be an optional optimization which the user has to activate explicitly by setting the config option execution.batch.job-recovery.enabled: true. This entails that Flink's default behavior won't change.

Limitations

Only support new source

Currently, the legacy source(SourceFunction, InputFormat) have already been depcreated, so we intend to only support new source.

Only work with adaptive batch scheduler

In FLIP-283, adaptive batch scheduler has been the default scheduler of Flink batch jobs, so we intend to only support working with the adaptive batch scheduler.

When using ApplicationMode, only support single-execute job

As mentioned in flink docs HA in Application Mode is only supported for single-execute applications. Job Recovery relies on HA,so when using ApplicationMode, the Job Recovery can only support single-execution applications.

Test Plan

The proposed changes will be tested for correctness and stability in a real cluster.