Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Based on the above discussion , discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future

Proposed Changes

This section details the proposed changes for the options 3 chosen in the last section.

Triggering Checkpoints After Tasks Finished

Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.

draw.io Diagram
bordertrue
diagramNameFigure.1
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth291
revision1

Figure 1. An example execution graph with one task finished. 

It might be argued that we could wait till the Tasks C to finish processing the remaining records and report its final snapshots, then we could include the final snapshot directly in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

Therefore, for checkpoints after some tasks are finished we need to trigger all the leaf tasks whose precedent tasks are all finished. Currently CheckpointCoordinator computes tasks to trigger in a separate timer thread, which runs concurrently with JobMaster’s main thread. The mechanism works well currently since it only needs to pick the current execution attempt for each execution vertex. However, to find all the leaf tasks needs to iterate over the whole graph and if JM changes tasks’ state during computation, it would be hard to guarantee the rightness of the algorithm, especially the JM might restarted the finished tasks on failover. To simplify the implementation we would proxy the computation of tasks to trigger to the main thread. 

The basic algorithm to compute the tasks to trigger would be iterating over the ExecutionGraph to find the leaf running tasks. However, the algorithm could be optimized by iterating over the JobGraph instead. The detailed algorithm is shown in Appendix. 

There might also be cases that tasks get finished between when we decide to trigger them and actually trigger them. In this case, we need to reconsider triggering the descendant tasks of this task. 

Extend the task Barrier Alignment

On the task side, we also need to modify the task barrier alignment mechanism to support the cases that some precedent tasks are finished. In this case, the downstream task would receive the EndOfPartition messages from the input channels.

For non-source tasks, if it receives the trigger of checkpoint n, then all its preceding tasks must be finished before checkpoint n and won’t send it the corresponding barriers. Therefore, we would first bookkeep the checkpoint n and after we received EndOfPartition from all the channels, we then report the snapshot for the checkpoint n. If the task received more checkpoint triggers after receiving EndOfPartition from all the channels, then it would report the snapshots directly.

For non-source tasks that do not receive the direct trigger of the checkpoint, it will receive barriers from some channels, but may receive EndOfPartition from some other channels. In this case, the channel received by EndOfPartition would be viewed as receiving the corresponding barrier. Specifically, for aligned checkpoints, the channels received EndOfPartition would be viewed as aligned. For unaligned checkpoints, the buffers before the EndOfPartition message will be snapshotted.


draw.io Diagram
bordertrue
diagramNameFigure.2
simpleViewerfalse
width
linksauto
tbstyletop
diagramDisplayName
lboxtrue
diagramWidth568
revision2

Figure 2. The tasks would either receive the checkpoint trigger and receive EndOfPartition from all the channels, or do not receive checkpoint trigger and receive barriers from some channels.

A summary of all the cases is listed in the following:


Aligned Checkpoint

Unaligned Checkpoint

Received Trigger + all EndOfPartition

Wait till processed all barriers, and then report the snapshot

Wait till processed all EndOfPartition, and then report the snapshot. In this case we would take the snapshot the same as the aligned method.

Do not receive Trigger + some barriers

Wait till received barrier or EndOfPartition from each channel, then report the snapshot.

When receiving the first barrier, start spilling the buffers before the barrier or EndOfParitition for each channel.

Tasks Waiting For Checkpoint Complete Before Finish

If some operators of a task have logic when notifying a checkpoint complete, the tasks would need to wait for a complete checkpoint before finished. For example, the Sink operator would need to wait for one complete checkpoint to commit all the pending transactions. 

To support these operators, the tasks would wait for a completed checkpoints if some operators or UDFS implemented the notifyCheckpointCompleted methods. 

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. Specially, if all the source operators have been finished, the remaining operator would directly start the process of finishing. 

For the first version, we would still restart all the tasks, and the finished tasks would start finishing normally. This simplifies the implementation of the task side and scheduler. 

Compatibility, Deprecation and Migration Plan

As discussed in the second section, the FLIP might change the behavior after failover if some operators have logic in initialization or endOfInput that relies on the state from the finished subtasks. However, such jobs would be rare and need to manually modify the job. 

The FLIP would also modify the CompletedCheckpoint serialization format by also recording which operators are fully finished. However, this would not cause compatibility issues due to Java’s serialization mechanism supporting adding or deleting fields. 

Appendix

Computing Tasks to Trigger

The target tasks to trigger are those who are still running but do not have any running precedent tasks. To avoid introducing large overhead to computing tasks to trigger for large jobs, we travel the JobGraph instead of traveling the ExecutionGraph directly. The algorithm is based on the following observation: 

  1. If JobVertex A is connected to JobVertex B and not all tasks of JobVertex A are running, then no tasks of JobVertex B might be triggered if A -> B is ALL_TO_ALL, and only tasks connected from finished tasks might be triggered if A -> B is POINTWISE.
  2. If one task is known to not need to trigger, it must be RUNNING since it is connected from some other running tasks. This indicates that the FINISHED tasks are always a subset of the tasks we need to check for triggering for each JobVertex.
  3. If one JobVertex is connected from multiple precedents JobVertices, then the tasks to check is the Union of the tasks decided by each president JobVertices.


Based on the above observation, we travel the JobVertices via topology order and for each job vertex, we could narrow down the tasks to check for all its descendant JobVertices. For the tasks to check, the not finished ones would be tasks to trigger. The pseudo-code of the algorithm would be

Code Block
linenumberstrue
Initialize V.tasks_to_check for each JobVertex V to be all its tasks;

for each JobVertex V in topology order;do
    if V.filtered_tasks_to_check is empty;then
        continue;
    endif

    running, finished = split V.filtered_tasks_to_check
    add running tasks to the tasks to trigger

    for each descendant JobVertex SubV;do
        if finished < all_tasks;then
            if V -> SubV is ALL_TO_ALL;then
                filtered_tasks_to_check = {}
            elif V -> SubV is POINTWISE;then
                filtered_tasks_to_check={tasks connected from running tasks of V}
            endif

            SubV.tasks_to_check = SubV.tasks_to_check union filtered_tasks_to_check;
        endif
    done
done