Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Consider that the operator coordinators may have large state, we believe it may cause large overhead if we snapshot the operator coordinator at each execution vertex finished. To solve this problem, we will add a new configuration option "execution.batch.job-recovery.operator-coordinator-snapshot.min-pause" to control the minimum pause between snapshots. When restoring, we will also reconcile the execution job vertex state with the operator coordinator state to be consistent. In other words, we will adjust the execution job vertex to the state at the time of the lastest snapshot operator coordinators.

...


Method OperatorCoordinator#supportsBatchSnapshot is used to identify whether the current operator coordinator supports taking snapshot in no-checkpoint/batch scenarios. If an operator coordinator does not support snapshot, which means it will return the inital state after JM restarts, we will handle it as follows:
1. If not all tasks corresponding to this operator coordinator are finished, we will reset all these tasks and re-run them.
2. If all tasks corresponding to this operator coordinator finished, we don't need to do anything and the job can continue to run. However, once one of these tasks needs to be restarted at some point in the future (for example, due to PartitionNotFound exception), we need to reset all these tasks and re-run them.

For the source coordinator, whether it supports batch snapshot depends on whether it's SplitEnumerator supports batch snapshot. For it, we introduce the SupportsBatchSnapshot mentioned above to identify it.

Persistent JobEventStore

We intend to introduce a persistent JobEventStore to record the JobEvents, the store is based on the file system and has the following features:

...