Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. it causes zombie tasks and waste resources. 
  2. it also requires a new “pre-EndOfPartition” event to indicate all the records are sent. Otherwise if it directly sent EndOfPartition before tasks finished, the communication channel would be destroyed and it would also be weird to have checkpoint barriers after EndOfPartition. However, introducing the “pre-EndOfPartition” event would largely complicate the termination process. 

Option 2. Allow tasks to finish & Checkpoints contain the final states from finished tasks

Another option is allowing tasks to finish normally and checkpoints after tasks finished would only take snapshots for the running tasks. A core issue of this option is whether we need to keep include the final snapshots of the finished tasks and also includes their final snapshots states collected when a tasks finish in the following checkpoints. Currently when failover happens after some tasks are finished, the job will fallback to a checkpoint taken when all the tasks are running. Including the final snapshots states of the finished tasks ensures the behavior unchanged compared with the current one since the finished tasks could be viewed as still running. However it also introduce some problems:

  1. It Including the states from finished tasks in all the following checkpoints requires the states get managed in the master side, which causes additional overhead to taking and managing the final snapshots.
  2. Since the same final snapshots states from the finished tasks would be used in multiple checkpoints, we need to introduce the reference count between checkpoints and snapshots. This complicates the checkpoint management, especially after we already have the reference count between snapshots and state items due to incremental checkpoints.due to incremental checkpoints.
  3. Including the states from the finished tasks implies that even if all the tasks of an operator have finished, we still need to restart the tasks of this operators after failover. For the long run, it limits the possibility that we only restart the running tasks on failover.resume the execution for the operators not fully finished. 

draw.io Diagram
bordertrue
diagramNameFigure.3
simpleViewerfalse
width400
linksauto
tbstyletop
lboxtrue
diagramWidth511
revision3

Figure 1. An illustration of the structure of the Checkpoint. One issue is that whether we need to keep the operator states collected from the finished tasks. 

Option 3. Allow tasks to finish & Checkpoints do not contain the final states from finished tasks

If we do not want to keep the final snapshots of include the states from the finished tasks instead, we need to explore how it changes the current behavior. Although in a checkpoint the state is snapshotted in the unit of task, it is finally reorganized by the unit of operator since there might be topology changes or rescaling. In other words, we need to view the checkpoint as composed of the current working progress of each operator, and tasks are only stateless runtime units to execute the remaining work for the operators. If we do not include the state from the finished tasks, it is equivalent to some operator discarding a part of finished work’s state in the checkpoint. Let ΔR represents the state of running task and ΔF represents the state final states of finished task tasks when taking checkpoints, then the result (e.g., the records sent to the descendant operator) of the operator’s execution after failover is

...

where I is the input after failover, g represents the logic of this operator and the decomposition is due to the fact that the work could be distributed to the different subtasks. Ideally the result should be the same with the execution without the states from the finished tasks, namely g(I, ΔR+ ΔF)=g(IΔR), which is equivalent to 

...