Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This section details the proposed changes for the options 3 chosen in the last section.

Triggering Checkpoints After Tasks Finished

Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.

draw.io Diagram
bordertrue
diagramNameFigure.1
simpleViewerfalse
linksauto
tbstyletop
lboxtrue
diagramWidth291
revision1

Figure 2. An example execution graph with one task finished. 

It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

To find the new "root" of the current execution graph,  we iterates all the tasks to find the tasks who are still running but have no running precedent tasks. A direct implementation would have O(n2) time complexity since it needs to check all the precedent tasks of each task. However, we could reduce the complexity to O(n) by exploiting the isomorphism of ALL_TO_ALL edges. The detail is described in Appendix 1. 

To avoid the case that the tasks finished during the computation, the computation is done in the JobMaster's main thread. However, there might still be inconsistency due to:

 

Checkpoint Format with Finished Tasks

For checkpoints involving finished tasks:

  1. If all the subtasks of an operator are finished, the checkpoint would store a flag "fully finished = true" so that we could skip the execution of this operator after recovery. To not cause compatibility problems, we would store a special state for this operator instead of directly adding one boolean flag.
  2. If some subtask of an operator are finished, the checkpoint would store the states collected from the other subtasks for this operator. After recovery, the subtasks would re-assign the remaining states. 

Triggering Checkpoints After Tasks Finished

Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.

draw.io Diagram
bordertrue
diagramNameFigure.1
simpleViewerfalse
linksauto
tbstyletop
lboxtrue
diagramWidth291
revision1

Figure 2. An example execution graph with one task finished. 

It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

To find the new "root" of the current execution graph,  we iterates all the tasks to find the tasks who are still running but have no running precedent tasks. A direct implementation would have O(n2) time complexity since it needs to check all the precedent tasks of each task. However, we could reduce the complexity to O(n) by exploiting the isomorphism of ALL_TO_ALL edges. The detail is described in Appendix 1. 

To avoid the case that the tasks finished during the computation, the computation is done in the JobMaster's main thread. However, there might still be inconsistency due to:

  1. For tasks running on For tasks running on different TaskManagers, the order of the reports of FINISHED status arrived at JobMaster is not guaranteed. That is to say some tasks might report FINISHED after its descendant tasks. 
  2. The tasks might finish between the computation and the tasks get triggered. 

...

Make CheckpointBarrierHandler Support EndOfPartition on TM Side

As a whole, for For the task side, there are two cases involving regarding checkpoints with finished precedent tasks: 

  1. All the precedent tasks are finished, The checkpoint is triggered after the task has received all the EndOfPartition, but not finished yet (due to waiting for the 
  2. The checkpoint is 
  3. the last checkpoint, or waiting for the downstream tasks to process all the pending records introduced in the following). 
  4. Some precedent tasks are finished and the task has received EndOfPartition from these tasks.

All the Precedent Tasks are Finished

Currently tasks could be classified into two types according to how checkpoint is notified: the source tasks are triggered via the RPC from the Currently tasks could be classified into two types according to how checkpoint is notified: the source tasks are trigger via the RPC from the JobManager, while the non-source tasks are triggered via barriers received from the precedent tasks. If checkpoints after some tasks are finished are considered, the non-source tasks might also get triggered via RPC if they become the new "root" tasks. The implementation for non-source tasks would be different from the source tasks: it would notify the CheckpointBarrierHandler instead of directly performing the snapshot so that the CheckpointBarrierHandler could deal with checkpoint subsuming correctly. Since currently triggerCheckpoint is is implemented in the base StreamTask class uniformly and only be called for source StreamTask classes, we would refactor the class hierarchy to support different implementation of  implementations of  triggerCheckpoint method for the two types of tasks.

Some of the Precedent Tasks are Finished

If for one task not all its precedent tasks are finished, the task would still get notified via checkpoint barrier from the running precedent tasks. However, some of the precedent preceding tasks might be already finished and the CheckpointBarrierHandler must consider this case. As a whole, EndOfPartition from one input channel would mark this channel as aligned for all the pending and following checkpoints. Therefore, when received endOfPartition, we will notify the 

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. Specially, if all the source operators have been finished, the remaining operator would directly start the process of finishing. 

For the first version, we would still restart all the tasks, and the finished tasks would start finishing normally. This simplifies the implementation of the task side and scheduler. 

The basic algorithm to compute the tasks to trigger would be iterating over the ExecutionGraph to find the new root running tasks. However, the algorithm could be optimized by iterating over the JobGraph instead. The detailed algorithm is shown in Appendix. 

...

receiving EndOfPartition, we will insert a manually created CheckpointBarrier for all the pending checkpoints, and exclude the channel from the following checkpoints. 

This is enough for aligned checkpoints, but unaligned checkpoints would introduce additional complexity. Since unaligned checkpoint barriers could jump over the pending records, if we instead wait for the EndOfPartition directly, since EndOfPartition could not jump over, the CheckpointCoordinator could not get notified in time and we might incur longer checkpoint periods during the finishing phase. This is similar for the aligned checkpoints with timeout. To cope with this issue, the upstream tasks would wait till the downstream tasks to process all the pending records before exit. The upstream tasks would emit a special event after all the records and the downstream tasks would respond with another special event, and the upstream tasks only exit after all the response events are received. During this period, the unaligned checkpoints or checkpoints with timeout could be done normally. Afterwards the EndOfPartition could reach the downstream CheckpointAligner quickly since there are no pending records.

Do Not Cancel Pending Checkpoint on Finish

Currently when a task finish, all the pending checkpoints would be canceled. Since now we would support checkpoints after tasks finished, tasks finish should not fail the checkpoints and we would change the behavior to waiting for all the pending checkpoints to finish before task get finished.

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. Specially, if all the source operators have been finished, the remaining operator would directly start the process of finishing. 

For the first version, we would still restart all the tasks, and the finished tasks would start finishing normally. This simplifies the implementation of the task side and scheduler. 

Rejected Alternatives

CheckpointCoordinator Maintains the Finished Status of Tasks Separately

Currently we have to deal with the case that tasks finished during triggering. Another option is to let the tasks synchronize with the CheckpointCoordinator on finishing: The tasks report finish status to CheckpointCoordinator, and CheckpointCoordinator cancels the pending checkpoints or replies with the pending checkpoints to wait for. The CheckpointCoordinator would maintains the finished status separately and use this status in computing of the following checkpoints.

Although this method could solve the inconsistency caused by tasks finished before get triggered, it would introduces additional complexity, and we would not apply this method present. 

Insert Barriers On Received EndOfPartition at the Task Side

Another option for make CheckpointBarrierHandler support finished tasks is when received EndOfPartition from one channel, CheckpointBarrierHandler inserts the barriers for the pending checkpoints into this channel, right before the EndOfPartition event. These barriers would be processed the same as normal barriers received. The we could do not modify the CheckpointBarrierHandler logic. 

However, this option could not support unaligned checkpoints before the EndOfPartition event has transferred to the downstream tasks, and it requires complex modification to the input channels, thus we do not use this option.

Compatibility, Deprecation and Migration Plan

As discussed in the second section, the FLIP might change the behavior after failover if some operators have logic in initialization or endOfInput that relies on the state from the finished subtasks. However, such jobs would be rare and need to manually modify the job. 

The FLIP would also modify the CompletedCheckpoint serialization format by also recording which operators are fully finished. However, this would not cause compatibility issues due to Java’s serialization mechanism supporting adding or deleting fields. 

Appendix

1.  Computing Tasks to Trigger

The target tasks to trigger are those who are still running but do not have any running precedent tasks. To avoid introducing large overhead to computing tasks to trigger for large jobs, we travel the JobGraph instead of traveling the ExecutionGraph directly. The algorithm is based on the following observation: 

  1. If the job edge A → B is ALL_TO_ALL and some tasks of JobVertex A is running, then all the tasks of JobVertex B must be running and do not need trigger.
  2. If the job edge A → B is POINTWISE and all the tasks of JobVertex A is running, the all the tasks of JobVertex B must be running and do not need trigger.

Based on the above observation, we could first do a fast check for each job vertex to see if it is possible to have some tasks to trigger. If the fast check fails, we know that for all the ALL_TO_ALL input edges, the upstream tasks must be finished, then we could only check if the tasks have running precedent tasks via POINTWISE edges. Therefore, the overall time complexity would be O(n).

Code Block
linenumberstrue
each_jv:
for each job vertex JV;do
    if all tasks of JV finished;then
        continue;
	endif

	// The fast check
	for each input job edge IJE;do
		if (IJE is AlL_TO_ALL and some tasks are running) or (IJE is POINTWISE and all tasks are running);then
			continue each_jv;
		endif
	endfor

	for each task of JV;do
		if task is running and no precedent tasks are running;then
			add this task;
		endif
	endfor
endfor

Figure 3. Different options to manage the (Stream)Task's Finished status.

A core divergence for this part is what kind of tasks are considered as FINISHED. As a whole, there are two possible options:

Option 1: Reuse Tasks' FINISH status

The first option is to reuse the tasks' FINISH status managed by the Scheduler. This option avoids additional overhead to maintain tasks' status. Since the tasks' status is modified in JobMaster's main thread, to avoid data inconsistencies we would need to proxy the task to compute tasks to trigger also to the main thread. 

This option might have one problem. During the time we compute the tasks to trigger and the trigger message reaching the TaskExecutor, the tasks might finish during this period, which make the trigger fail. If we do not deal with this situation, the checkpoint would fail after timeout since not all tasks report their snapshots. To deal with this problem, we would need to make CheckpointCoordinator listen to the FINISH status report of the tasks, and if one task finished before reporting snapshot, CheckpointCoordinator would re-compute the tasks to trigger and re-trigger the new sources after the tasks finished. Although this patch could solve this issue, if we met with the cascade cases unfortunately that the triggers keep fail due to tasks get finished, the JM might incurs high overhead. 

Option 2: Checkpoint Coordinator maintains the finish status separately

If triggering one task fails due to the task finished in advance, what we need to is to report an empty state for this task and re-trigger its descendant tasks. The first action is simple to achieve since the state is empty and the core is how to notify the descendant tasks. For option 1 we have to go via JobMaster since when the task report FINISH the channels to the descendant tasks have already been closed by emitting EndOfPartition.

Therefore, another option is to not reuse the tasks' FINISH status and introduce another report before emitting EndOfPartition. As shown in Figure 2, we could make StreamTask report to CheckpointCoordinator about its going to finish. CheckpointCoordinator would marked the task as finished immediately when received the report, and reply the tasks with the pending checkpoints that do not trigger it successfully. The StreamTask would then broadcast corresponding barriers to the descendant tasks as normal checkpoints. 

Compare to the first option, this option introduces a new RPC and asks CheckpointCoordinator to maintain a set of finished tasks. CheckpointCoordinator also need to remove the tasks from the set if it restarted. However, the maintained set could be discarded on JobMaster Failover, which decrease the complexity. Besides, this option avoid the cascade finish problem. 

For the first version we would go with the option 1 to avoid complex implementation involving new RPC and duplicate finished state.

Extend the task Barrier Alignment

On the task side, we also need to modify the task barrier alignment mechanism to support the cases that some precedent tasks are finished. In this case, the downstream task would receive the EndOfPartition messages from the input channels.

...

Figure 4. The tasks would either receive the checkpoint trigger and receive EndOfPartition from all the channels, or do not receive checkpoint trigger and receive barriers from some channels.

For one task, it might be notified of checkpoint via two ways:

  1. If all its precedent tasks have finished, it would receive the checkpoint trigger message via RPC.
  2. If some of its precedent tasks are still running, it would receive the checkpoint barrier from the corresponding channels.

For both cases, to avoid interference with the logic of checkpoint alignment, we would insert barriers in the channels right before EndOfPartition:

  1. Whenever an input channel enqueued EndOfPartition from network or local precedent result partition, it would know the corresponding precedent task is finished. It would then insert a prioritized message into the channel to notify the CheckpointBarrierHandler about it. 
  2. Whenever CheckpointBarrierHandle received a new barrier or checkpoint trigger for the first time, it would insert the corresponding barriers into all the input channels that have received EndOfPartition, right before the EndOfPartition. The inserted barrier would respect the options of the checkpoint, like enabled unaligned checkpoint or checkpoint alignment timeout.
  3. During step 1, when CheckpointBarrierHandler knows that an input channel has enqueued EndOfPartition, it would also check the pending checkpoints and insert barriers into this input channel. 
  4. After CheckpointBarrierHandler has actually processed EndOfPartition from one channel, it would stop watching this channel and decrease the total number of barriers required for each checkpoint.
  5. After CheckpointBarrierHandler has processed EndOfPartition from all the channels, it would only received checkpoint notification of case 1 and would directly trigger snapshot. 

One pending problem here is for unaligned checkpoint, the barriers are inserted only after the downstream tasks have received EndOfPartition. Compared with normal unaligned checkpoint, the checkpoint here could only skip the buffer cached in the input channels, but not the buffers in the upstream tasks' result partitions. This might enlarge the time required. We could not allows the barriers to skip the result partition of the upstream tasks because the upstream tasks should have finished and could not recover the snapshotted result partition data after failover. To avoid introducing slow checkpoint, we might make the upstream tasks to wait till all the buffers in the result partition are flushed into the network. Then during this period, the normal unaligned checkpoint could still be taken. But since it does not affect the correctness, we would consider this optimization in future version. 

Do Not Cancel Pending Checkpoint on Finish

Currently when a task finish, all the pending checkpoints would be canceled. Since now we would support checkpoints after tasks finished, tasks finish should not fail the checkpoints and we would change the behavior to waiting for all the pending checkpoints to finish before task get finished.

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. Specially, if all the source operators have been finished, the remaining operator would directly start the process of finishing. 

For the first version, we would still restart all the tasks, and the finished tasks would start finishing normally. This simplifies the implementation of the task side and scheduler. 

Rejected Alternatives

Compatibility, Deprecation and Migration Plan

As discussed in the second section, the FLIP might change the behavior after failover if some operators have logic in initialization or endOfInput that relies on the state from the finished subtasks. However, such jobs would be rare and need to manually modify the job. 

The FLIP would also modify the CompletedCheckpoint serialization format by also recording which operators are fully finished. However, this would not cause compatibility issues due to Java’s serialization mechanism supporting adding or deleting fields. 

Appendix

1.  Computing Tasks to Trigger

The target tasks to trigger are those who are still running but do not have any running precedent tasks. To avoid introducing large overhead to computing tasks to trigger for large jobs, we travel the JobGraph instead of traveling the ExecutionGraph directly. The algorithm is based on the following observation: 

  1. If JobVertex A is connected to JobVertex B and not all tasks of JobVertex A are running, then no tasks of JobVertex B might be triggered if A -> B is ALL_TO_ALL, and only tasks connected from finished tasks might be triggered if A -> B is POINTWISE.
  2. If one task is known to not need to trigger, it must be RUNNING since it is connected from some other running tasks. This indicates that the FINISHED tasks are always a subset of the tasks we need to check for triggering for each JobVertex.
  3. If one JobVertex is connected from multiple precedents JobVertices, then the tasks to check is the Union of the tasks decided by each president JobVertices.

Based on the above observation, we travel the JobVertices via topology order and for each job vertex, we could narrow down the tasks to check for all its descendant JobVertices. For the tasks to check, the not finished ones would be tasks to trigger. The pseudo-code of the algorithm would be

Code Block
linenumberstrue
Initialize V.tasks_to_check for each JobVertex V to be all its tasks;

for each JobVertex V in topology order;do
    if V.filtered_tasks_to_check is empty;then
        continue;
    endif

    running, finished = split V.filtered_tasks_to_check
    add running tasks to the tasks to trigger

    for each descendant JobVertex SubV;do
        if finished < all_tasks;then
            if V -> SubV is ALL_TO_ALL;then
                filtered_tasks_to_check = {}
            elif V -> SubV is POINTWISE;then
                filtered_tasks_to_check={tasks connected from running tasks of V}
            endif

            SubV.tasks_to_check = SubV.tasks_to_check union filtered_tasks_to_check;
        endif
    done
done