Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

draw.io Diagram
bordertrue
diagramNametask_lifecycle
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth421
revision3

Figure 3. Different options to manage the (Stream)Task's Finished status.

A core divergence for this part is what kind of tasks are considered as FINISHED. As a whole, there are two possible options:

...

If triggering one task fails due to the task finished in advance, what we need to is to trigger the 

Extend the task Barrier Alignment

On the task side, we also need to modify the task barrier alignment mechanism to support the cases that some precedent tasks are finished. In this case, the downstream task would receive the EndOfPartition messages from the input channels.

For non-source tasks, if it receives the trigger of checkpoint n, then all its preceding tasks must be finished before checkpoint n and won’t send it the corresponding barriers. Therefore, we would first bookkeep the checkpoint n and after we received EndOfPartition from all the channels, we then report the snapshot for the checkpoint n. If the task received more checkpoint triggers after receiving EndOfPartition from all the channels, then it would report the snapshots directly.

For non-source tasks that do not receive the direct trigger of the checkpoint, it will receive barriers from some channels, but may receive EndOfPartition from some other channels. In this case, the channel received by EndOfPartition would be viewed as receiving the corresponding barrier. Specifically, for aligned checkpoints, the channels received EndOfPartition would be viewed as aligned. For unaligned checkpoints, the buffers before the EndOfPartition message will be snapshotted.

report an empty state for this task and re-trigger its descendant tasks. The first action is simple to achieve since the state is empty and the core is how to notify the descendant tasks. For option 1 we have to go via JobMaster since when the task report FINISH the channels to the descendant tasks have already been closed by emitting EndOfPartition.

Therefore, another option is to not reuse the tasks' FINISH status and introduce another report before emitting EndOfPartition. As shown in Figure 2, we could make StreamTask report to CheckpointCoordinator about its going to finish. CheckpointCoordinator would marked the task as finished immediately when received the report, and reply the tasks with the pending checkpoints that do not trigger it successfully. The StreamTask would then broadcast corresponding barriers to the descendant tasks as normal checkpoints. 

Compare to the first option, this option introduces a new RPC and asks CheckpointCoordinator to maintain a set of finished tasks. CheckpointCoordinator also need to remove the tasks from the set if it restarted. However, the maintained set could be discarded on JobMaster Failover, which decrease the complexity. Besides, this option avoid the cascade finish problem. 

Extend the task Barrier Alignment

On the task side, we also need to modify the task barrier alignment mechanism to support the cases that some precedent tasks are finished. In this case, the downstream task would receive the EndOfPartition messages from the input channels.

draw.io Diagram
border
draw.io Diagram
bordertrue
diagramNameFigure.2
simpleViewerfalse
width
linksauto
tbstyletop
diagramDisplayName
lboxtrue
diagramWidth568718
revision39

Figure 2. The tasks would either receive the checkpoint trigger and receive EndOfPartition from all the channels, or do not receive checkpoint trigger and receive barriers from some channels.

A summary of all the cases is listed in the following:

...

Aligned Checkpoint

...

Unaligned Checkpoint

...

Received Trigger + all EndOfPartition

...

Wait till processed all barriers, and then report the snapshot

...

Wait till processed all EndOfPartition, and then report the snapshot. In this case we would take the snapshot the same as the aligned method.

...

Do not receive Trigger + some barriers

...

Wait till received barrier or EndOfPartition from each channel, then report the snapshot.

...

When receiving the first barrier, start spilling the buffers before the barrier for each channel.

For one task, it might be notified of checkpoint via two ways:

  1. If all its precedent tasks have finished, it would receive the checkpoint trigger message via RPC.
  2. If some of its precedent tasks are still running, it would receive the checkpoint barrier from the corresponding channels.

For both cases, to avoid interference with the logic of checkpoint alignment, we would insert barriers in the channels right before EndOfPartition:

  1. Whenever an input channel enqueued EndOfPartition from network or local precedent result partition, it would know the corresponding precedent task is finished. It would then insert a prioritized message into the channel to notify the CheckpointBarrierHandler about it. 
  2. Whenever CheckpointBarrierHandle received a new barrier or checkpoint trigger for the first time, it would insert the corresponding barriers into all the input channels that have received EndOfPartition, right before the EndOfPartition. The inserted barrier would respect the options of the checkpoint, like enabled unaligned checkpoint or checkpoint alignment timeout.
  3. During step 1, when CheckpointBarrierHandler knows that an input channel has enqueued EndOfPartition, it would also check the pending checkpoints and insert barriers into this input channel. 
  4. After CheckpointBarrierHandler has actually processed EndOfPartition from one channel, it would stop watching this channel and decrease the total number of barriers required for each checkpoint.
  5. After CheckpointBarrierHandler has processed EndOfPartition from all the channels, it would only received checkpoint notification of case 1 and would directly trigger snapshot. 


One pending problem here is for unaligned checkpoint, the barriers are inserted only after the downstream tasks have received EndOfPartition. Compared with normal unaligned checkpoint, the checkpoint here could only skip the buffer cached in the input channels, but not the buffers in the upstream tasks' result partitions. This might enlarge the time required. We could not allows the barriers to skip the result partition of the upstream tasks because the upstream tasks should have finished and could not recover the snapshotted result partition data after failover. To avoid introducing slow checkpoint, we might make the upstream tasks to wait till all the buffers in the result partition are flushed into the network. Then during this period, the normal unaligned checkpoint could still be taken. But since it does not affect the correctness, we would consider this optimization in future version. 

Do Not Cancel Pending Checkpoint on Finish

Currently when a task finish, all the pending checkpoints would be canceled. Since now we would support checkpoints after tasks finished, tasks finish should not fail the checkpoints and we would change the behavior to waiting for all the pending checkpoints to finish before task get finished.

Support Operators Waiting For Checkpoint Complete Before Finish

If some operators of a task have logic when notifying a checkpoint complete, the tasks would need to wait for a complete checkpoint before finished. For example, the Sink operator would need to wait for one complete checkpoint to commit all the pending transactions. 

To support these kind of operators, we could introduce a new interface to mark the operators need to wait for finalization right after endOfInput():

Code Block
languagejava
linenumberstrue
public interface FinalizationRequired {

	CompletableFuture<Void> getFinalizeFuture();

}

For such kind of operators, runtime would keep waiting till the finalize future is done. If the operator is running in the mail thread, the waiting is achieve by keeping checking whether the future is done and yield the mailbox thread, otherwise the runtime would block via future.get().

Tasks Waiting For Checkpoint Complete Before Finish

If some operators of a task have logic when notifying a checkpoint complete, the tasks would need to wait for a complete checkpoint before finished. For example, the Sink operator would need to wait for one complete checkpoint to commit all the pending transactions. 

To support these operators, the tasks would wait for a completed checkpoints if some operators or UDFS implemented the notifyCheckpointCompleted methods. 

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. Specially, if all the source operators have been finished, the remaining operator would directly start the process of finishing. 

...