Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Therefore, we need to also support checkpoints even after some tasks are finished.  

Overall Design

There are multiple options to achieve this target. In this section we compare different options.

Option 1. Prevent tasks from finishing

The first option is to prevent the tasks from finishing until the whole job is finished. Specially, tasks would never finish if the job also has unbounded sources. This option is not preferred due to that

  1. it causes zombie tasks and waste resources. 
  2. it also requires a new “pre-EndOfPartition” event to indicate all the records are sent. Otherwise if it directly sent EndOfPartition before tasks finished, the communication channel would be destroyed and it would also be weird to have checkpoint barriers after EndOfPartition. However, introducing the “pre-EndOfPartition” event would largely complicate the termination process. 

Option 2. Allow tasks to finish & Checkpoints contain the final states from finished tasks

Another option is allowing tasks to finish normally and checkpoints after tasks finished would only take snapshots for the running tasks. A core issue of this option is whether we need to include the final states collected when a tasks finish in the following checkpoints. Currently when failover happens after some tasks are finished, the job will fallback to a checkpoint taken when all the tasks are running. Including the final states of the finished tasks ensures the behavior unchanged compared with the current one since the finished tasks could be viewed as still running. However it also introduce some problems:

  1. Including the states from finished tasks in all the following checkpoints requires the states get managed in the master side, which causes additional overhead.
  2. Since the same states from the finished tasks would be used in multiple checkpoints, we need to introduce the reference count between checkpoints and snapshots. This complicates the checkpoint management, especially after we already have the reference count between snapshots and state items due to incremental checkpoints.
  3. Including the states from the finished tasks implies that even if all the tasks of an operator have finished, we still need to restart the tasks of this operators after failover. For the long run, it limits the possibility that we only resume the execution for the operators not fully finished. 

...

Figure 1. An illustration of the structure of the Checkpoint. One issue is that whether we need to keep the operator states collected from the finished tasks. 

Option 3. Allow tasks to finish & Checkpoints do not contain the final states from finished tasks (The Chosen One)

If we do not want to include the states from the finished tasks instead, we need to explore how it changes the current behavior. Although in a checkpoint the state is snapshotted in the unit of task, it is finally reorganized by the unit of operator since there might be topology changes or rescaling. In other words, we need to view the checkpoint as composed of the current working progress of each operator, and tasks are only stateless runtime units to execute the remaining work for the operators. If we do not include the state from the finished tasks, it is equivalent to some operator discarding a part of finished work’s state in the checkpoint. Let ΔR represents the state of running task and ΔF represents the final states of finished tasks when taking checkpoints, then the result (e.g., the records sent to the descendant operator) of the operator’s execution after failover is

g(I, ΔR+ ΔF)=g(IRΔR)+g(IFΔF)

...

Based on such an ability, the operators write data to the external systems in 2pc style would be able to wait for one more checkpoint after tasks emitted all the records to commit the last piece of data. This is similar to the job terminated with stop-with-savepoint --drain, thus we would also like to unify the two processes so that the last piece of data could be committed in all the required cases. To achieve this target efficiently we would also like to adjust the operator API and the life cycle of the StreamTask. 

Overall Design

There are multiple options to achieve this target. In this section we compare different options.

Option 1. Prevent tasks from finishing

The first option is to prevent the tasks from finishing until the whole job is finished. Specially, tasks would never finish if the job also has unbounded sources. This option is not preferred due to that

  1. it causes zombie tasks and waste resources. 
  2. it also requires a new “pre-EndOfPartition” event to indicate all the records are sent. Otherwise if it directly sent EndOfPartition before tasks finished, the communication channel would be destroyed and it would also be weird to have checkpoint barriers after EndOfPartition. However, introducing the “pre-EndOfPartition” event would largely complicate the termination process. 

Option 2. Allow tasks to finish & Checkpoints contain the final states from finished tasks

Another option is allowing tasks to finish normally and checkpoints after tasks finished would only take snapshots for the running tasks. A core issue of this option is whether we need to include the final states collected when a tasks finish in the following checkpoints. Currently when failover happens after some tasks are finished, the job will fallback to a checkpoint taken when all the tasks are running. Including the final states of the finished tasks ensures the behavior unchanged compared with the current one since the finished tasks could be viewed as still running. However it also introduce some problems:

  1. Including the states from finished tasks in all the following checkpoints requires the states get managed in the master side, which causes additional overhead.
  2. Since the same states from the finished tasks would be used in multiple checkpoints, we need to introduce the reference count between checkpoints and snapshots. This complicates the checkpoint management, especially after we already have the reference count between snapshots and state items due to incremental checkpoints.
  3. Including the states from the finished tasks implies that even if all the tasks of an operator have finished, we still need to restart the tasks of this operators after failover. For the long run, it limits the possibility that we only resume the execution for the operators not fully finished. 

draw.io Diagram
bordertrue
diagramNameFigure.3
simpleViewerfalse
width400
linksauto
tbstyletop
lboxtrue
diagramWidth511
revision3

Figure 1. An illustration of the structure of the Checkpoint. One issue is that whether we need to keep the operator states collected from the finished tasks. 

Option 3. Allow tasks to finish & Checkpoints do not contain the final states from finished tasks (The Chosen One)

If we do not want to include the states from the finished tasks instead, we need to explore how it changes the current behavior. Although in a checkpoint the state is snapshotted in the unit of task, it is finally reorganized by the unit of operator since there might be topology changes or rescaling. In other words, we need to view the checkpoint as composed of the current working progress of each operator, and tasks are only stateless runtime units to execute the remaining work for the operators. If we do not include the state from the finished tasks, it is equivalent to some operator discarding a part of finished work’s state in the checkpoint. Let ΔR represents the state of running task and ΔF represents the final states of finished tasks when taking checkpoints, then the result (e.g., the records sent to the descendant operator) of the operator’s execution after failover is

g(I, ΔR+ ΔF)=g(IRΔR)

...

+g(IFΔF)=Ø

Namely there should be no records sent due to ΔF no matter whether we keep it or not. 

Source Operators

The source operator does not have input and the equation is further equivalent to g(ΔF)=Ø. The logic of the source operators could be modeled as reading each split from the external system and emitting the records to the pipeline. With legacy source API the source operators usually discover all the splits on startup and record the progress of each split in a union list state. Unfortunately with this pattern if we discard the state for the finished splits, we would re-read them after failover, which violates the condition if we do not keep ΔF. The new source API would overcome this issue since now the splits are discovered and recorded in the OperatorCoordinator, whose state is still kept after all the tasks are finished. 

In consideration that we would finally migrate to the new source API, we could temporarily avoid the repeat records issue of the legacy source issue by 

  1. Failing checkpoints if some source operators contain both finished and unfinished subtasks.
  2. Skipping the actual execution for legacy source task (namely StreamSourceTask) if all the subtasks have finished before failover. This requires we also bookkeep the finished operators in the checkpoints.

As a whole, the source operators could achieve the condition, and the temporary hack could be removed after we migrated to the new Source API.

...

The logic of a non-source operator could be split into processing the input records and the logic in initialize and endOfInput, namely the condition is equivalent to  

g(IFΔF)=gp(IFΔF)+gc(ΔF)=Ø

For the first part, if in a checkpoint some subtask of a non-source operator is finished, then

  1. All the source tasks of ALL_TO_ALL inputs are finished and IF. This is due to these precedent tasks sending EndOfPartition to the subtasks of the operator in discussion at the same time, thus the running subtasks should also have received all the records in the checkpoints. 
  2. For the source tasks of POINTWISE inputs, the precedent tasks of the finished subtasks must also be finished. This indicates all the remaining input records should not rely on ΔF, otherwise they could not be correctly computed even without failover. This implies all the remaining records belongs to Iand IF=Ø.

...

where I is the input after failover, g represents the logic of this operator and the decomposition is due to the fact that the work could be distributed to the different subtasks. Ideally the result should be the same with the execution without the states from the finished tasks, namely g(I, ΔR+ ΔF)=g(IΔR), which is equivalent to 

g(IFΔF)=Ø

Namely there should be no records sent due to ΔF no matter whether we keep it or not. 

Source Operators

The source operator does not have input and the equation is further equivalent to g(ΔF)=Ø. The logic of the source operators could be modeled as reading each split from the external system and emitting the records to the pipeline. With legacy source API the source operators usually discover all the splits on startup and record the progress of each split in a union list state. Unfortunately with this pattern if we discard the state for the finished splits, we would re-read them after failover, which violates the condition if we do not keep ΔF. The new source API would overcome this issue since now the splits are discovered and recorded in the OperatorCoordinator, whose state is still kept after all the tasks are finished. 

In consideration that we would finally migrate to the new source API, we could temporarily avoid the repeat records issue of the legacy source issue by 

  1. Failing checkpoints if some source operators contain both finished and unfinished subtasks.
  2. Skipping the actual execution for legacy source task (namely StreamSourceTask) if all the subtasks have finished before failover. This requires we also bookkeep the finished operators in the checkpoints.

As a whole, the source operators could achieve the condition, and the temporary hack could be removed after we migrated to the new Source API.


Non-source Operators

The logic of a non-source operator could be split into processing the input records and the logic in initialize and endOfInput, namely the condition is equivalent to  

g(IFΔF)=gp(IFΔF)

...

+gc(ΔF)=Ø

For the first part, if in a checkpoint some subtask of a non-source operator is finished, then

  1. All the source tasks of ALL_TO_ALL inputs are finished and IF. This is due to these precedent tasks sending EndOfPartition to the subtasks of the operator in discussion at the same time, thus the running subtasks should also have received all the records in the checkpoints. 
  2. For the source tasks of POINTWISE inputs, the precedent tasks of the finished subtasks must also be finished. This indicates all the remaining input records should not rely on ΔF, otherwise they could not be correctly computed even without failover. This implies all the remaining records belongs to Iand IF=Ø.

Thus, we should always have IF=Ø and thus gp(IFΔF)=Ø no matter whether we save ΔF

The situation of the second part is equivalent to the source operators. However, the non-source operators rarely have the similar logic as legacy source. Instead, the result related to ΔF is usually sent before finished and does not need to resent after failover. For example, the operators doing aggregation and sending the final result on endOfInput only need sending the remaining aggregation value after failover. Even if there are operators does not satisfy the condition, the operators could push the states could not be discarded to OperatorCoordinator instead, like the new Source API does. 


Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

Proposed Changes

This section details the proposed changes for the options 3 chosen in the last section. 

Checkpoint Format with Finished Tasks

For checkpoints involving finished tasks:

  1. If all the subtasks of an operator are finished, the checkpoint would store a flag "fully finished = true" so that we could skip the execution of this operator after recovery. To not cause compatibility problems, we would store a special state for this operator instead of directly adding one boolean flag.
  2. If some subtask of an operator are finished, the checkpoint would store the states collected from the other subtasks for this operator. After recovery, the subtasks would re-assign the remaining states. 

Triggering Checkpoints After Tasks Finished

Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.

draw.io Diagram
bordertrue
diagramNameFigure.1
simpleViewerfalse
linksauto
tbstyletop
lboxtrue
diagramWidth291
revision1

Figure 2. An example execution graph with one task finished. 

It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

To find the new "root" of the current execution graph,  we iterates all the tasks to find the tasks who are still running but have no running precedent tasks. A direct implementation would have O(n2) time complexity since it needs to check all the precedent tasks of each task. However, we could reduce the complexity to O(n) by exploiting the isomorphism of ALL_TO_ALL edges. The detail is described in Appendix 1. 

To avoid the case that the tasks finished during the computation, the computation is done in the JobMaster's main thread. However, there might still be inconsistency due to:

  1. For tasks running on different TaskManagers, the order of the reports of FINISHED status arrived at JobMaster is not guaranteed. That is to say some tasks might report FINISHED after its descendant tasks. 
  2. The tasks might finish between the computation and the tasks get triggered. 

In both cases, the checkpoint trigger would fail and the whole checkpoint would then fail due to timeout. Since checkpoint timeout would block the next checkpoint and cause failover by default, it would be better to detect the trigger failure as earlier as possible. The failure could be detected if a task finished before acknowledging the snapshot for this checkpoint. CheckpointCoordinator would listen to the event of task finish, when a task finish, it iterates all the pending checkpoints to do the detection.

Make CheckpointBarrierHandler Support EndOfPartition on TM Side

For the task side, there are two cases regarding checkpoints with finished tasks:

  1. All the precedent tasks are finished, the task has received all the EndOfPartition, but not finished yet (due to waiting for the last checkpoint, or waiting for the downstream tasks to process all the pending records introduced in the following). 
  2. Some precedent tasks are finished and the task has received EndOfPartition from these tasks.

All the Precedent Tasks are Finished

Currently tasks could be classified into two types according to how checkpoint is notified: the source tasks are triggered via the RPC from the JobManager, while the non-source tasks are triggered via barriers received from the precedent tasks. If checkpoints after some tasks are finished are considered, the non-source tasks might also get triggered via RPC if they become the new "root" tasks. The implementation for non-source tasks would be different from the source tasks: it would notify the CheckpointBarrierHandler instead of directly performing the snapshot so that the CheckpointBarrierHandler could deal with checkpoint subsuming correctly. Since currently triggerCheckpoint is implemented in the base StreamTask class uniformly and only be called for source StreamTask classes, we would refactor the class hierarchy to support different implementations of  triggerCheckpoint method for the two types of tasks.

Some of the Precedent Tasks are Finished

If for one task not all its precedent tasks are finished, the task would still get notified via checkpoint barrier from the running precedent tasks. However, some of the preceding tasks might be already finished and the CheckpointBarrierHandler must consider this case. As a whole, EndOfPartition from one input channel would mark this channel as aligned for all the pending and following checkpoints. Therefore, when receiving EndOfPartition, we will insert a manually created CheckpointBarrier for all the pending checkpoints, and exclude the channel from the following checkpoints. 

This is enough for aligned checkpoints, but unaligned checkpoints would introduce additional complexity. Since unaligned checkpoint barriers could jump over the pending records, if we instead wait for the EndOfPartition directly, since EndOfPartition could not jump over, the CheckpointCoordinator could not get notified in time and we might incur longer checkpoint periods during the finishing phase. This is similar for the aligned checkpoints with timeout. To cope with this issue, the upstream tasks would wait till the downstream tasks to process all the pending records before exit. The upstream tasks would emit a special event, called EndOfUserRecordsEvent, after all the records and the downstream tasks would respond with another special event, and the upstream tasks only exit after all the response events are received. During this period, the unaligned checkpoints or checkpoints with timeout could be done normally. Afterwards the EndOfPartition could reach the downstream CheckpointAligner quickly since there are no pending records.

Do Not Cancel Pending Checkpoint on Finish

Currently when a task finish, all the pending checkpoints would be canceled. Since now we would support checkpoints after tasks finished, tasks finish should not fail the checkpoints and we would change the behavior to waiting for all the pending checkpoints to finish before task get finished. 

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. However, for the first version, we would still restart all the tasks and skip the execution of fully finished operators. This simplifies the implementation of the task side and scheduler. 

Changes the Operator APIs Related to Finish

Based on the ability to do checkpoint or savepoint after tasks finished, operators interacting with external systems in 2PC style could commit the last piece of data by waiting for one more checkpoint or savepoint. In general, for jobs to finish there are three cases:

  1. All records are processed and the job finished normally.
  2. The job is finished due to stop-with-savepoint --drain.
  3. The job is finished due to stop-with-savepoint.

For the first two cases, the job would not expected to be restarted, thus we should flush all the records and commit all the records. But for the third cases, the job is expected to be re started from the savepoint, thus in this case we should not flush all the records. For example, for window operators using event-time triggers, it would be reasonable to trigger all the remaining timers if it is going to finish, but for the third cases, the job would be restarted and process more records, triggering all the timers would cause wrong result after restarting. 

To flush all the records on finished, we need to 

  1. Emit a special MAX_WATERMARK to trigger all the event time timers.
  2. Call the dedicated operator API to flush the other buffered records. 

However, currently we

The situation of the second part is equivalent to the source operators. However, the non-source operators rarely have the similar logic as legacy source. Instead, the result related to ΔF is usually sent before finished and does not need to resent after failover. For example, the operators doing aggregation and sending the final result on endOfInput only need sending the remaining aggregation value after failover. Even if there are operators does not satisfy the condition, the operators could push the states could not be discarded to OperatorCoordinator instead, like the new Source API does. 

Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

Proposed Changes

This section details the proposed changes for the options 3 chosen in the last section. 

Checkpoint Format with Finished Tasks

For checkpoints involving finished tasks:

  1. If all the subtasks of an operator are finished, the checkpoint would store a flag "fully finished = true" so that we could skip the execution of this operator after recovery. To not cause compatibility problems, we would store a special state for this operator instead of directly adding one boolean flag.
  2. If some subtask of an operator are finished, the checkpoint would store the states collected from the other subtasks for this operator. After recovery, the subtasks would re-assign the remaining states. 

Triggering Checkpoints After Tasks Finished

Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.

draw.io Diagram
bordertrue
diagramNameFigure.1
simpleViewerfalse
linksauto
tbstyletop
lboxtrue
diagramWidth291
revision1

Figure 2. An example execution graph with one task finished. 

It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

To find the new "root" of the current execution graph,  we iterates all the tasks to find the tasks who are still running but have no running precedent tasks. A direct implementation would have O(n2) time complexity since it needs to check all the precedent tasks of each task. However, we could reduce the complexity to O(n) by exploiting the isomorphism of ALL_TO_ALL edges. The detail is described in Appendix 1. 

To avoid the case that the tasks finished during the computation, the computation is done in the JobMaster's main thread. However, there might still be inconsistency due to:

  1. For tasks running on different TaskManagers, the order of the reports of FINISHED status arrived at JobMaster is not guaranteed. That is to say some tasks might report FINISHED after its descendant tasks. 
  2. The tasks might finish between the computation and the tasks get triggered. 

In both cases, the checkpoint trigger would fail and the whole checkpoint would then fail due to timeout. Since checkpoint timeout would block the next checkpoint and cause failover by default, it would be better to detect the trigger failure as earlier as possible. The failure could be detected if a task finished before acknowledging the snapshot for this checkpoint. CheckpointCoordinator would listen to the event of task finish, when a task finish, it iterates all the pending checkpoints to do the detection.

Make CheckpointBarrierHandler Support EndOfPartition on TM Side

For the task side, there are two cases regarding checkpoints with finished tasks:

  1. All the precedent tasks are finished, the task has received all the EndOfPartition, but not finished yet (due to waiting for the last checkpoint, or waiting for the downstream tasks to process all the pending records introduced in the following). 
  2. Some precedent tasks are finished and the task has received EndOfPartition from these tasks.

All the Precedent Tasks are Finished

Currently tasks could be classified into two types according to how checkpoint is notified: the source tasks are triggered via the RPC from the JobManager, while the non-source tasks are triggered via barriers received from the precedent tasks. If checkpoints after some tasks are finished are considered, the non-source tasks might also get triggered via RPC if they become the new "root" tasks. The implementation for non-source tasks would be different from the source tasks: it would notify the CheckpointBarrierHandler instead of directly performing the snapshot so that the CheckpointBarrierHandler could deal with checkpoint subsuming correctly. Since currently triggerCheckpoint is implemented in the base StreamTask class uniformly and only be called for source StreamTask classes, we would refactor the class hierarchy to support different implementations of  triggerCheckpoint method for the two types of tasks.

Some of the Precedent Tasks are Finished

If for one task not all its precedent tasks are finished, the task would still get notified via checkpoint barrier from the running precedent tasks. However, some of the preceding tasks might be already finished and the CheckpointBarrierHandler must consider this case. As a whole, EndOfPartition from one input channel would mark this channel as aligned for all the pending and following checkpoints. Therefore, when receiving EndOfPartition, we will insert a manually created CheckpointBarrier for all the pending checkpoints, and exclude the channel from the following checkpoints. 

This is enough for aligned checkpoints, but unaligned checkpoints would introduce additional complexity. Since unaligned checkpoint barriers could jump over the pending records, if we instead wait for the EndOfPartition directly, since EndOfPartition could not jump over, the CheckpointCoordinator could not get notified in time and we might incur longer checkpoint periods during the finishing phase. This is similar for the aligned checkpoints with timeout. To cope with this issue, the upstream tasks would wait till the downstream tasks to process all the pending records before exit. The upstream tasks would emit a special event after all the records and the downstream tasks would respond with another special event, and the upstream tasks only exit after all the response events are received. During this period, the unaligned checkpoints or checkpoints with timeout could be done normally. Afterwards the EndOfPartition could reach the downstream CheckpointAligner quickly since there are no pending records.

Do Not Cancel Pending Checkpoint on Finish

Currently when a task finish, all the pending checkpoints would be canceled. Since now we would support checkpoints after tasks finished, tasks finish should not fail the checkpoints and we would change the behavior to waiting for all the pending checkpoints to finish before task get finished.

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. However, for the first version, we would still restart all the tasks and skip the execution of fully finished operators. This simplifies the implementation of the task side and scheduler. 

Changes to Public(Evolving) APIs

We lack a clear method in the API to stop processing records and flush any buffered records. We do have the StreamOperator#close  method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose  method. Lastly, the semantic of RichFunction#close  is different from StreamOperator#close . Having no direct access to any output the RichFunction#close  is responsible purely for releasing resources. We suggest using this opportunity to clean up the semi-public StreamOperator  API and:

...

Code Block
@Public
public interface SinkFunction<IN> extends Function, Serializable {
    default void finish() throws Exception {}
}

Final Checkpoint / Savepoint

As shown in the above interface, to ensure the last piece of data gets committed, the task would need to wait for one more checkpoint. Without special treatment, the task would have to wait for the checkpoints one-by-one, which would be very inefficient. To cope with this issue, we would like to further change the process of the StreamTask: we would introduce a new event to notify the end of user records, then all the tasks could first call finish() on operators in a chain. Then each task could wait for its last checkpoint separately. Once the last checkpoint is notified, it could then emit EndOfPartition to cleanup the network resources as usual. 

We could also unify stop-with-savepoint into the same process. Currently we have three different savepoint types:

  1. Trigger savepoint without stopping job.
  2. stop-with-savepoint
  3. stop-with-savepoint --drain.

Currently there would be some duplication between endOfInput() and finish() for one input operators, for the long run we would like to unify them to finish() method and keep only endOfInput(int channelIndex) and finish(). 

Final Checkpoint and Stopping Job With Savepoint

If we directly map the original close() to the new finish() method, based on the current StreamTask's implementation, it would call the finish() method after received EndOfPartitionEvent and emit EndOfPartitionEvent after the last checkpoint / savepoint is completed. This would cause the tasks to wait for the last checkpoint / savepoint one-by-one, which is inefficient. 

Besides, currently stop-with-savepoint [--drain] uses a different processes: it first triggers a savepoint and blocks the tasks till the savepoint is completed, then it finishes the tasks. There would cause duplication if we want to also flush the records for stop-with-savepoint --drain. Besides, currently there might still be records between the savepoint and finishTask, this would cause confusion, especially if these records affect external systems. 

To deal with the above issues, we would like to adjust the lifecycle of the StreamTasks to avoid chained waiting and diverged processes:

  1. We would like to introduce a new event to notify all the user records are finished, and call finish() immediately after the new event is aligned for all the input channels. Since it shares the same semantics with the event EndOfUserRecordsEvent we introduced to solve the unaligned checkpoint issues in previous sections, we could reuse this event. 
  2. We would adjust the process of savepoint to be first finish the task and then trigger a savepoint so that they could be unified with the final checkpoint process and avoid confused records after savepoint. 

The detail life cycle of the StreamTask and the operators would become Savepoint without stopping the job is more like normal checkpoints and does not involve task finish, thus it is not considered in this FLIP. For stop-with-savepoint [–drain], currently it would first trigger a savepoint and hold all the tasks till the savepoint succeeded, then it finished all the source tasks to stop the job. This might have some issues in that there might be still some data between savepoint and finishing tasks. To cope with this issue, we could also unify the stop-with-savepoint [–drain] with the final checkpoint: when received the savepoint requirement from JM, the source tasks would first finishTask() and emit the event to notify the following tasks to call finish() on all the operators, then all the tasks head to wait for the savepoint with the specified id to finish. The new process avoids the data after savepoints and provides a unified treatment for all the scenarios related to committing the last piece of data.



Rejected Alternatives

CheckpointCoordinator Maintains the Finished Status of Tasks Separately

...