Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

draw.io Diagram
bordertrue
diagramNameFigure.1
simpleViewerfalse
linksauto
tbstyletop
lboxtrue
diagramWidth291
revision1

Figure 12. An example execution graph with one task finished. 

It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

Therefore, for checkpoints after some tasks are finished we need to trigger all the new root tasks whose precedent tasks are all finished. Currently CheckpointCoordinator computes tasks to trigger in a separate timer thread, which runs concurrently with JobMaster’s main thread. The mechanism works well currently since it only needs to pick the current execution attempt for each execution vertex. However, to find all the new root tasks needs to iterate over the whole graph and if JM changes tasks’ state during computation, it would be hard to guarantee the rightness of the algorithm, especially the JM might restarted the finished tasks on failover. To simplify the implementation we would proxy the computation of tasks to trigger to the main thread. 

The basic algorithm to compute the tasks to trigger would be iterating over the ExecutionGraph to find the new root running tasks. However, the algorithm could be optimized by iterating over the JobGraph instead. The detailed algorithm is shown in Appendix. 

The basic algorithm to compute the tasks to trigger would be iterating over the ExecutionGraph to find the new root running tasks. However, the algorithm could be optimized by iterating over the JobGraph instead. The detailed algorithm is shown in Appendix. 


draw.io Diagram
bordertrue
diagramNametask_lifecycle
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth421
revision3


A core divergence for this part is what kind of tasks are considered as FINISHED. As a whole, there are two possible options:

Option 1: Reuse Tasks' FINISH status

The first option is reuse the tasks' FINISH status managed byScheduler. This option avoid additional overhead to maintain tasks' status. Since the tasks' status is modified in JobMaster's main thread, to avoid data inconsistent we would need to proxy the task to compute tasks to trigger also to the main thread. 

This option might has one problem. During the time we compute the tasks to trigger and the trigger message reached the TaskExecutor, the tasks might finish during this period, which make the trigger fail. If we do not deal with this situation, the checkpoint would fail after timeout since not all tasks report their snapshots. To deal with this problem, we would need to make CheckpointCoordinator listen to the FINISH status report of the tasks, and if one task finished before reporting snapshot, CheckpointCoordinator would re-compute the tasks to trigger and re-trigger the new sources after the tasks finished. Although this patch could solve this issue, if we met with the cascade cases unfortunately that the triggers keep fail due to tasks get finished, the JM might incurs high overhead. 

Option 2: Checkpoint Coordinator maintains the finish status separately

If triggering one task fails due to the task finished in advance, we need to trigger the There might also be cases that tasks get finished between when we decide to trigger them and actually trigger them. In this case, we need to reconsider triggering the descendant tasks of this task. 


Extend the task Barrier Alignment

...

The FLIP would also modify the CompletedCheckpoint serialization format by also recording which operators are fully finished. However, this would not cause compatibility issues due to Java’s serialization mechanism supporting adding or deleting fields. 

Appendix

1.  Computing Tasks to Trigger

The target tasks to trigger are those who are still running but do not have any running precedent tasks. To avoid introducing large overhead to computing tasks to trigger for large jobs, we travel the JobGraph instead of traveling the ExecutionGraph directly. The algorithm is based on the following observation: 

...