Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. it causes zombie tasks and waste resources. 
  2. it also requires a new “pre-EndOfPartition” event to indicate all the records are sent. Otherwise if it directly sent EndOfPartition before tasks finished, the communication channel would be destroyed and it would also be weird to have checkpoint barriers after EndOfPartition. However, introducing the “pre-EndOfPartition” event would largely complicate the termination process. 

Option 2. Allow tasks to finish &

...

Include the state from finished tasks

Another option is allowing tasks to finish normally and checkpoints after tasks finished would only take snapshots for the running tasks. A core issue of this option is whether we need to keep the final snapshots of the finished tasks and also includes their final snapshots in the following checkpoints. Currently when failover happens after some tasks are finished, the job will fallback to a checkpoint taken when all the tasks are running. Including the final snapshots of the finished tasks ensures the behavior unchanged compared with the current one since the finished tasks could be viewed as still running. However it also introduce some problems:

  1. It causes additional overhead to taking and managing the final snapshots.
  2. Since the same final snapshots would be used in multiple checkpoints, we need to introduce the reference count between checkpoints and snapshots. This complicates the checkpoint management, especially after we already have the reference count between snapshots and state items due to incremental checkpoints.
  3. For the long run, it limits the possibility that we only restart the running tasks on failover. 
    draw.io Diagram
    bordertrue
    diagramNameFigure.3
    simpleViewerfalse
    width
    linksauto
    tbstyletop
    lboxtrue
    diagramWidth511
    revision2

Option 3. Allow tasks to finish & Do not

...

include the state from finished tasks

If we do not want to keep the final snapshots of the finished tasks instead, we need to explore how it changes the current behavior. Although in a checkpoint the state is snapshotted in the unit of task, it is finally reorganized by the unit of operator since there might be topology changes or rescaling. In other words, we need to view the checkpoint as composed of the current working progress of each operator, and tasks are only stateless runtime units to execute the remaining work for the operators. If we do not include the state from the finished tasks, it is equivalent to some operator discarding a part of finished work’s state in the checkpoint. Let ΔR represents the state of running task and ΔF represents the state of finished task when taking checkpoints, then the result of the operator’s execution after failover is

...