Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. During the normal execution, Flink will record states of JM (ExecutionGraph, OperatorCoordinator, etc) to persistent storage so that we can recover based on these states after JM crash. We will introduce an event-based method to record the state of JM.
  2. During the JM crashes and restarts (generally HA will be responsible for restarting JM), the shuffle service and TM will retain the partitions related to the target job and try to continuously reconnectDuring the JM crashes and restarts (generally HA will be responsible for restarting JM), the shuffle service and TM will retain the partitions related to the target job and try to continuously reconnect.
  3. After JM restarts, the connection with shuffle service and TM will be re-established. Then JM will recover the job progress based on the previous recorded states and partitions currently existing in the cluster Then JM will recover the job progress based on the previous recorded states and partitions currently existing in the cluster, and restart the scheduling.

...

In order to obtain the state of operator coordinators, we will enrich the checkpointCoordinatormethod to let it accept  -1 (NO_CHECKPOINT) as the value of checkpointId, to support snapshotting the state of operator coordinator in batch jobs. After JM crashes, the operator coordinator can be restored from the previous recorded state. In addition to a simple restore(by resetToCheckpoint method), it also needs to call subtaskReset for the non-finished tasks (which may in running state before JM crashes) , because these tasks will be reset and re-run after JM crashes.

Consider that the operator coordinators may have large state, we believe it may cause large overhead if we snapshot the operator coordinator at each execution vertex finished. To solve this problemConsider that the operator coordinators may have large state, we believe it may cause large overhead if we snapshot the operator coordinator at each execution vertex finished.To solve this problemwe will add a new configuration option "execution.batch.job-recovery.operator-coordinator-snapshot.min-pause"  to control the minimum interval between snapshots. to control the minimum interval between snapshots. When restoring, we will also reconcile the execution job vertex state with the operator coordinator state to be consistent. In other words, we will adjust the execution job vertex to the state at the time of the lastest snapshot operator coordinators.

...

Currently, when a JM crashes, TM will sense it through HA or heartbeat timeout, and then it will release all resources related to this job (including slots, and partitions when using TM shuffle), and then disconnect from JM.=

To support job recovery, we need do the following changes:

...

After JM restarts and becomes leader again, it will wait for a period of time for the TMs to re-establish connection with itself. The length of waiting time is controlled by "execution.batch.job-recovery.previous-worker.recovery.timeout",  only TMs connected within this period will be accepted, and those that time out will be rejected. Once we have enough partitions (all partitions required to continue running are registered), we can end this wait early and continue to the next steponly TMs connected within this period will be accepted, and those that time out will be rejected. Once we have enough partitions (all partitions required to continue running are registered), we can end this wait early and continue to the next step.

After re-establish connection with TMs, JM will try to obtain all partitions existing in cluster through ShuffleMaster, and re-establish the partition information in JobMasterPartitionTracker. To do that, we need to add a new method getAllPartitionWithMetrics to ShuffleMaster.

After reAfter re-establish JobMasterPartitionTracker, JM begins to replay the JobEvents from the JobEventStore, recover the execution graph state, and then starts rescheduling based on the execution graph state and the partitions currently existing in the clusterestablish JobMasterPartitionTracker, JM begins to replay the JobEvents from the JobEventStore, recover the execution graph state, and then starts rescheduling based on the execution graph state and the partitions currently existing in the cluster:

  1. Initialize all ExecutionJobVertex whose parallelism has been decided. We can obtain the initialization information from the replayed events (ExecutionJobVertexInitializedEvent).
  2. According to the information in JobMasterPartitionTracker, the execution vertices whose produced partitions are all tracked will be marked as finished. 
  3. For execution vertices that are not marked as finished, as mentioned above, if its corresponding job vertex has operator coordinators, we need to call subtaskReset for them.
  4. Find all sink/leaf execution vertices in ExecutionGraph. For each sink/leaf execution vertex in the non-finish state, recursively find all its upstream vertices that need to be restarted (which are in unfinished state), and then start scheduling based on this.

...

Compatibility, Deprecation, and Migration Plan

In the first version, the job recovery will be an optional optimization which the user has to activate explicitly by setting the config option In the first version, the job recovery will be an optional optimization which the user has to activate explicitly by setting the config option execution.batch.job-recovery.enabled:  true. This entails that Flink's default behavior won't changetrue. This entails that Flink's default behavior won't change.

Limitations

Only support new source

...