Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-2491

...

Release1.14



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Therefore, we need to also support checkpoints even after some tasks are finished. Based on such an ability, the operators write writing data to the external systems in 2pc style would be able to wait for one more checkpoint after tasks emitted all the records to commit the last piece of data. This is similar to the job terminated with stop-with-savepoint --drain, thus we would also like to unify the two processes so that the last piece of data could be committed in all the required cases. To achieve this target efficiently we would also like to adjust the operator API and the life cycle of the StreamTask. 

...

The situation of the second part is equivalent to the source operators. However, the non-source operators rarely have the similar logic as legacy source. Instead, the result related to ΔF is usually sent before finished and does not need to resent after failover. For example, the operators doing aggregation and sending the final result on endOfInput only need sending the remaining aggregation value after failover. Even if there are operators that does not satisfy the condition, the operators could push the states could not be discarded to OperatorCoordinator instead, like the new Source API does. 

Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

Proposed Changes

This section details the proposed changes for the options 3 chosen in the last section. 

Checkpoint Format with Finished Tasks

For checkpoints involving finished tasks:

  1. If all the subtasks of an operator are finished, the checkpoint would store a flag "fully finished = true" so that we could skip the execution of this operator after recovery. To not cause compatibility problems, we would store a special state for this operator instead of directly adding one boolean flag.
  2. If some subtask of an operator are finished, the checkpoint would store the states collected from the other subtasks for this operator. After recovery, the subtasks would re-assign the remaining states. 

Triggering Checkpoints After Tasks Finished

Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.

draw.io Diagram
bordertrue
diagramNameFigure.1
simpleViewerfalse
linksauto
tbstyletop
lboxtrue
diagramWidth291
revision1

Figure 2. An example execution graph with one task finished. 

It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

To find the new "root" of the current execution graph,  we iterates all the tasks to find the tasks who are still running but have no running precedent tasks. A direct implementation would have O(n2) time complexity since it needs to check all the precedent tasks of each task. However, we could reduce the complexity to O(n) by exploiting the isomorphism of ALL_TO_ALL edges. The detail is described in Appendix 1. 

To avoid the case that the tasks finished during the computation, the computation is done in the JobMaster's main thread. However, there might still be inconsistency due to:

  1. For tasks running on different TaskManagers, the order of the reports of FINISHED status arrived at JobMaster is not guaranteed. That is to say some tasks might report FINISHED after its descendant tasks. 
  2. The tasks might finish between the computation and the tasks get triggered. 

In both cases, the checkpoint trigger would fail and the whole checkpoint would then fail due to timeout. Since checkpoint timeout would block the next checkpoint and cause failover by default, it would be better to detect the trigger failure as earlier as possible. The failure could be detected if a task finished before acknowledging the snapshot for this checkpoint. CheckpointCoordinator would listen to the event of task finish, when a task finish, it iterates all the pending checkpoints to do the detection.

Make CheckpointBarrierHandler Support EndOfPartition on TM Side

For the task side, there are two cases regarding checkpoints with finished tasks:

  1. All the precedent tasks are finished, the task has received all the EndOfPartition, but not finished yet (due to waiting for the last checkpoint, or waiting for the downstream tasks to process all the pending records introduced in the following). 
  2. Some precedent tasks are finished and the task has received EndOfPartition from these tasks.

All the Precedent Tasks are Finished

Currently tasks could be classified into two types according to how checkpoint is notified: the source tasks are triggered via the RPC from the JobManager, while the non-source tasks are triggered via barriers received from the precedent tasks. If checkpoints after some tasks are finished are considered, the non-source tasks might also get triggered via RPC if they become the new "root" tasks. The implementation for non-source tasks would be different from the source tasks: it would notify the CheckpointBarrierHandler instead of directly performing the snapshot so that the CheckpointBarrierHandler could deal with checkpoint subsuming correctly. Since currently triggerCheckpoint is implemented in the base StreamTask class uniformly and only be called for source StreamTask classes, we would refactor the class hierarchy to support different implementations of  triggerCheckpoint method for the two types of tasks.

Some of the Precedent Tasks are Finished

If for one task not all its precedent tasks are finished, the task would still get notified via checkpoint barrier from the running precedent tasks. However, some of the preceding tasks might be already finished and the CheckpointBarrierHandler must consider this case. As a whole, EndOfPartition from one input channel would mark this channel as aligned for all the pending and following checkpoints. Therefore, when receiving EndOfPartition, we will insert a manually created CheckpointBarrier for all the pending checkpoints, and exclude the channel from the following checkpoints. 

This is enough for aligned checkpoints, but unaligned checkpoints would introduce additional complexity. Since unaligned checkpoint barriers could jump over the pending records, if we instead wait for the EndOfPartition directly, since EndOfPartition could not jump over, the CheckpointCoordinator could not get notified in time and we might incur longer checkpoint periods during the finishing phase. This is similar for the aligned checkpoints with timeout. To cope with this issue, the upstream tasks would wait till the downstream tasks to process all the pending records before exit. The upstream tasks would emit a special event, called EndOfUserRecordsEvent, after all the records and the downstream tasks would respond with another special event, and the upstream tasks only exit after all the response events are received. During this period, the unaligned checkpoints or checkpoints with timeout could be done normally. Afterwards the EndOfPartition could reach the downstream CheckpointAligner quickly since there are no pending records.

Do Not Cancel Pending Checkpoint on Finish

Currently when a task finish, all the pending checkpoints would be canceled. Since now we would support checkpoints after tasks finished, tasks finish should not fail the checkpoints and we would change the behavior to waiting for all the pending checkpoints to finish before task get finished. 

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. However, for the first version, we would still restart all the tasks and skip the execution of fully finished operators. This simplifies the implementation of the task side and scheduler. 

Changes the Operator APIs Related to Finish

Based on the ability to do checkpoint or savepoint after tasks finished, operators interacting with external systems in 2PC style could commit the last piece of data by waiting for one more checkpoint or savepoint. In general, for jobs to finish there are three cases:

  1. All records are processed and the job finished normally.
  2. The job is finished due to stop-with-savepoint --drain.
  3. The job is finished due to stop-with-savepoint.

For the first two cases, the job would not expected to be restarted, thus we should flush all the records and commit all the records. But for the third cases, the job is expected to be re started from the savepoint, thus in this case we should not flush all the records. For example, for window operators using event-time triggers, it would be reasonable to trigger all the remaining timers if it is going to finish, but for the third cases, the job would be restarted and process more records, triggering all the timers would cause wrong result after restarting. 

To flush all the records on finished, we need to 

  1. Emit a special MAX_WATERMARK to trigger all the event time timers.
  2. Call the dedicated operator API to flush the other buffered records. 

However, currently we lack a clear method in the API to stop processing records and flush buffered records. We do have the StreamOperator#close  method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose  method. Lastly, the semantic of RichFunction#close  is different from StreamOperator#close . Having no direct access to any output the RichFunction#close  is responsible purely for releasing resources. We suggest using this opportunity to clean up the semi-public StreamOperator  API and:

  1. remove the dispose  method
  2. change the semantic of close  method
  3. introduce new finish  method

Effectively it would modify the Operator  lifecycle (https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/task_lifecycle/#operator-lifecycle-in-a-nutshell) termination phase to be:

Code Block
    // termination phase
    OPERATOR::endOfInput(1)
    ...
    OPERATOR::endOfInput(n)

    OPERATOR::finish
        UDF(if available)::finish

    OPERATOR::snapshotState()
    OPERATOR::notifyCheckpointComplete()

    OPERATOR::close() --> call UDF's close method
        UDF::close()

In case of failure, we will call the Operator#close → UDF#close method.

Code Block
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {

    ....

    /**
     * This method is called at the end of data processing.
     *
     * <p>The method is expected to flush all remaining buffered data. Exceptions during this
     * flushing of buffered should be propagated, in order to cause the operation to be recognized
     * as failed, because the last data items are not processed properly.
     *
     * <p><b>After this method is called, no more records can be produced for the downstream operators.</b>
     *
     * <p><b>NOTE:</b>This method does not need to close any resources. You should release external
     * resources in the {@link #close()} method.
     *
     * @throws java.lang.Exception An exception in this method causes the operator to fail.
     */
    void finish() throws Exception;

    /**
     * This method is called at the very end of the operator's life, both in the case of a
     * successful completion of the operation, and in the case of a failure and canceling.
     *
     * <p>This method is expected to make a thorough effort to release all resources that the
     * operator has acquired.
     *
     * <p><b>NOTE:</b>It should not emit any records! If you need to emit records at the end of
     * processing, do so in the {@link #finish()} method.
     */
    void close() throws Exception;

    ...

}

The UDF that most often buffers data and thus requires a flushing/finishing phase is the SinkFunction where it could e.g. create transactions in an external system that can be committed during the final snapshot. Therefore we suggest introducing a finish  method in the SinkFunction:

Code Block
@Public
public interface SinkFunction<IN> extends Function, Serializable {
    default void finish() throws Exception {}
}

Currently there would be some duplication between endOfInput() and finish() for one input operators, for the long run we would like to unify them to finish() method and keep only endOfInput(int channelIndex) and finish(). 

Final Checkpoint and Stopping Job With Savepoint

If we directly map the original close() to the new finish() method, based on the current StreamTask's implementation, it would call the finish() method after received EndOfPartitionEvent and emit EndOfPartitionEvent after the last checkpoint / savepoint is completed. This would cause the tasks to wait for the last checkpoint / savepoint one-by-one, which is inefficient. 

Besides, currently stop-with-savepoint [--drain] uses a different processes: it first triggers a savepoint and blocks the tasks till the savepoint is completed, then it finishes the tasks. There would cause duplication if we want to also flush the records for stop-with-savepoint --drain. Besides, currently there might still be records between the savepoint and finishTask, this would cause confusion, especially if these records affect external systems. 

To deal with the above issues, we would like to adjust the lifecycle of the StreamTasks to avoid chained waiting and diverged processes:

  1. We would like to introduce a new event to notify all the user records are finished, and call finish() immediately after the new event is aligned for all the input channels. Since it shares the same semantics with the event EndOfUserRecordsEvent we introduced to solve the unaligned checkpoint issues in previous sections, we could reuse this event. 
  2. We would adjust the process of savepoint to be first finish the task and then trigger a savepoint so that they could be unified with the final checkpoint process and avoid confused records after savepoint. 

The detail life cycle for the source stream tasks and the operators would become 

...

Stream Task Status

...

No more records

or

Received Savepoint Trigger

...

Operator state

There are a few scenarios in which operator states are used in combination with an additional implicit contract on data distribution. Those implicit contracts might not hold in case we restore state with partially finished operators. Therefore we would like to discuss how we want to use the operator state in combination with finished tasks.

BroadcastState

In case of broadcast state all operators snapshot their state. The assumption is that every such state is equal and can be used when restoring for any given subtask. We want to leverage that property and when restoring with some subtasks finished, we would use any of the non-empty state (taken for any running subtask).

UnionListState

The UnioinListState is more complex. A common usage pattern is to implement a "broadcast" state by storing state on a single subtask.  Afterwards, when initializing subtasks the single copy of the state would be distributed to all subtasks. Another common pattern is to use a UnionListState to create a global view. It is used for example to share information about offsets of partitions which has been consumed so far. This lets us restart from the given offset if after restore a partition is assigned to a different subtask than originally. Such a logic can only be implemented with a merged state of all original subtasks If a part of subtasks are finished and we only keep the remaining state in the checkpoint. This can obviously loose important bit of information, or even the entire state in case of described implementation of broadcast state. However, since the UnionListState is on the way to be deprecated and replaced by the OperatorCoordinator.

For the time being, we will allow for a situation that if an operator checkpoints UnionListState, it can only finish all at once. We will decline checkpoints if not all of the tasks called finished and received notifyCheckpointComplete .

ListState

We want to make the contract of ListState more explicit that the redistribution may happen even in case there is no rescaling. This might have some sophisticated implications. Imagine a situation where you have a topology:

src 0 --> op 0 --> sink 0

src 1 --> op 1 --> sink 1

src 2 --> op 2 --> sink 2


We buffer records in operators op X. If src 1 finishes its state will be cleared. Than if after a restore the state of src 2 is assigned to src 1.  Records from partitions originally assigned to src 2 will end up in both op1 and op2. Depending on the processing speed of the two operators, if the op 1 unbuffers records faster it may happen that later records from such partition will overtake earlier records in the pipeline.  However, Flink never offered explicit guarantees that in case of recovery, non-keyed ListState will be assigned in the same order to subtasks before and after recovery, especially across multiple operators. As part of this FLIP, we want to document and clarify this.

Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

Proposed Changes

This section details the proposed changes for the options 3 chosen in the last section. 

Feature flag

We want to introduce a feature flag to enable or disable the new feature:


Code Block
    @Documentation.ExcludeFromDocumentation("This is a feature toggle")
    public static final ConfigOption<Boolean> ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH =
            ConfigOptions.key("execution.checkpointing.checkpoints-after-tasks-finish.enabled")
                    .booleanType()
                    .defaultValue(false)
                    .withDescription(
                            "Feature toggle for enabling checkpointing after tasks finish.");


Checkpoint Format with Finished Tasks

For checkpoints involving finished tasks:

  1. If all the subtasks of an operator are finished, the checkpoint would store a flag "fully finished = true" so that we could skip the execution of this operator after recovery. To not cause compatibility problems, we would store a special state for this operator instead of directly adding one boolean flag.
  2. If some subtask of an operator are finished, the checkpoint would store the states collected from the other subtasks for this operator. After recovery, the subtasks would re-assign the remaining states. 

Triggering Checkpoints After Tasks Finished

Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.

draw.io Diagram
bordertrue
diagramNameFigure.1
simpleViewerfalse
linksauto
tbstyletop
lboxtrue
diagramWidth291
revision1

Figure 2. An example execution graph with one task finished. 

It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

To find the new "root" of the current execution graph,  we iterates all the tasks to find the tasks who are still running but have no running precedent tasks. A direct implementation would have O(n2) time complexity since it needs to check all the precedent tasks of each task. However, we could reduce the complexity to O(n) by exploiting the isomorphism of ALL_TO_ALL edges. The detail is described in Appendix 1. 

To avoid the case that the tasks finished during the computation, the computation is done in the JobMaster's main thread. However, there might still be inconsistency due to:

  1. For tasks running on different TaskManagers, the order of the reports of FINISHED status arrived at JobMaster is not guaranteed. That is to say some tasks might report FINISHED after its descendant tasks. 
  2. The tasks might finish between the computation and the tasks get triggered. 

In both cases, the checkpoint trigger would fail and the whole checkpoint would then fail due to timeout. Since checkpoint timeout would block the next checkpoint and cause failover by default, it would be better to detect the trigger failure as earlier as possible. The failure could be detected if a task finished before acknowledging the snapshot for this checkpoint. CheckpointCoordinator would listen to the event of task finish, when a task finish, it iterates all the pending checkpoints to do the detection.

Make CheckpointBarrierHandler Support EndOfPartition on TM Side

For the task side, there are two cases regarding checkpoints with finished tasks:

  1. All the precedent tasks are finished, the task has received all the EndOfPartition, but not finished yet (due to waiting for the last checkpoint, or waiting for the downstream tasks to process all the pending records introduced in the following). 
  2. Some precedent tasks are finished and the task has received EndOfPartition from these tasks.

All the Precedent Tasks are Finished

Currently tasks could be classified into two types according to how checkpoint is notified: the source tasks are triggered via the RPC from the JobManager, while the non-source tasks are triggered via barriers received from the precedent tasks. If checkpoints after some tasks are finished are considered, the non-source tasks might also get triggered via RPC if they become the new "root" tasks. The implementation for non-source tasks would be different from the source tasks: it would notify the CheckpointBarrierHandler instead of directly performing the snapshot so that the CheckpointBarrierHandler could deal with checkpoint subsuming correctly. Since currently triggerCheckpoint is implemented in the base StreamTask class uniformly and only be called for source StreamTask classes, we would refactor the class hierarchy to support different implementations of  triggerCheckpoint method for the two types of tasks.

Some of the Precedent Tasks are Finished

If for one task not all its precedent tasks are finished, the task would still get notified via checkpoint barrier from the running precedent tasks. However, some of the preceding tasks might be already finished and the CheckpointBarrierHandler must consider this case. As a whole, EndOfPartition from one input channel would mark this channel as aligned for all the pending and following checkpoints. Therefore, when receiving EndOfPartition, we will insert a manually created CheckpointBarrier for all the pending checkpoints, and exclude the channel from the following checkpoints. 

This is enough for aligned checkpoints, but unaligned checkpoints would introduce additional complexity. Since unaligned checkpoint barriers could jump over the pending records, if we instead wait for the EndOfPartition directly, since EndOfPartition could not jump over, the CheckpointCoordinator could not get notified in time and we might incur longer checkpoint periods during the finishing phase. This is similar for the aligned checkpoints with timeout. To cope with this issue, the upstream tasks would wait till the downstream tasks to process all the pending records before exit. The upstream tasks would emit a special event, namely EndOfData, after all the records and the downstream tasks would respond with another special event, and the upstream tasks only exit after all the response events are received. During this period, the unaligned checkpoints or checkpoints with timeout could be done normally. Afterwards the EndOfPartition could reach the downstream CheckpointAligner quickly since there are no pending records.

Do Not Cancel Pending Checkpoint on Finish

Currently when a task finish, all the pending checkpoints would be canceled. Since now we would support checkpoints after tasks finished, tasks finish should not fail the checkpoints and we would change the behavior to waiting for all the pending checkpoints to finish before task get finished.

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. However, for the first version, we would still restart all the tasks and skip the execution of fully finished operators. This simplifies the implementation of the task side and scheduler. 

Changes the Operator APIs Related to Finish

Based on the ability to do checkpoint or savepoint after tasks finished, operators interacting with external systems in 2PC style could commit the last piece of data by waiting for one more checkpoint or savepoint. In general, for jobs to finish there are three cases:

  1. All records are processed and the job finished normally.
  2. The job is finished due to stop-with-savepoint --drain.
  3. The job is finished due to stop-with-savepoint.

For the first two cases, the job would not be expected to be restarted, thus we should flush all the records and commit all the records. But for the third case, the job is expected to be re-started from the savepoint, thus in this case we should not flush all the records. For example, for window operators using event-time triggers, it would be reasonable to trigger all the remaining timers if it is going to finish, but for the third case, the job would be restarted and process more records, triggering all the timers would cause wrong result after restarting. 

To flush all the records on finished, we need to 

  1. Emit a special MAX_WATERMARK to trigger all the event time timers.
  2. Call the dedicated operator API to flush the other buffered records. 

However, currently we lack a clear method in the API to stop processing records and flush buffered records. We lack a clear method in the API to stop processing records and flush any buffered records. We do have the StreamOperator#close  method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose  method. Lastly, the semantic of RichFunction#close  is different from StreamOperator#close . Having no direct access to any output the RichFunction#close  is responsible purely for releasing resources. We suggest using this opportunity to clean up the semi-public StreamOperator  API and:

  1. remove the dispose  method
  2. change the semantic of close  method
  3. introduce new finish  method

Effectively it would modify the Operator  lifecycle (https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/task_lifecycle/#operator-lifecycle-in-a-nutshell) termination phase to be:

Code Block
    // termination phase
    OPERATOR::endOfInput(1)
    ...
    OPERATOR::endOfInput(n)

    OPERATOR::finish
        UDF(if available)::finish

    OPERATOR::snapshotState()
    OPERATOR::notifyCheckpointComplete()

    OPERATOR::close() --> call UDF's close method
        UDF::close()

In case of failure, we will call the Operator#close → UDF#close method.


Code Block
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {

    ....

    /**
     * This method is called at the end of data processing.
     *
     * <p>The method is expected to flush all remaining buffered data. Exceptions during this
     * flushing of buffered should be propagated, in order to cause the operation to be recognized
     * as failed, because the last data items are not processed properly.
     *
     * <p><b>After this method is called, no more records can be produced for the downstream operators.</b>
     *
     * <p><b>NOTE:</b>This method does not need to close any resources. You should release external
     * resources in the {@link #close()} method.
     *
     * @throws java.lang.Exception An exception in this method causes the operator to fail.
     */
    void finish() throws Exception;

    /**
     * This method is called at the very end of the operator's life, both in the case of a
     * successful completion of the operation, and in the case of a failure and canceling.
     *
     * <p>This method is expected to make a thorough effort to release all resources that the
     * operator has acquired.
     *
     * <p><b>NOTE:</b>It should not emit any records! If you need to emit records at the end of
     * processing, do so in the {@link #finish()} method.
     */
    void close() throws Exception;

    ...

}

The UDF that most often buffers data and thus requires a flushing/finishing phase is the SinkFunction where it could e.g. create transactions in an external system that can be committed during the final snapshot. Therefore we suggest introducing a finish  method in the SinkFunction :

Code Block
@Public
public interface SinkFunction<IN> extends Function, Serializable {
    default void finish() throws Exception {}
}

Currently there would be some duplication between endOfInput() and finish() for one input operators, for the long run we would like to unify them to finish() method and keep only endOfInput(int channelIndex) and finish(). 

Info

The finish() method is tied to the lifecycle of an Operator and it is NOT tied to the lifecycle of State. It is possible that after recovery a state created after calling finish() might be restored to an operator that will receive more records. The operator implementation must be prepared for that.


In particular it is generally not safe to write a finished flag into the state and check that you do not receive records if the flag is set.


Final Checkpoint and Stopping Job With Savepoint

If we directly map the original close() to the new finish() method, based on the current StreamTask's implementation, it would call the finish() method after received EndOfPartitionEvent and emit EndOfPartitionEvent after the last checkpoint / savepoint is completed. This would cause the tasks to wait for the last checkpoint / savepoint one-by-one, which is inefficient. 

Besides, currently stop-with-savepoint [--drain] uses a different process: it first triggers a savepoint and blocks the tasks till the savepoint is completed, then it finishes the tasks. There would cause duplication if we want to also flush the records for stop-with-savepoint --drain. Besides, currently there might still be records between the savepoint and finishTask, this would cause confusion, especially if these records affect external systems. 

To deal with the above issues, we would like to adjust the lifecycle of the StreamTasks to avoid chained waiting and diverged processes:

  1. We would like to introduce a new event to notify all the user records are finished, and call finish() immediately after the new event is aligned for all the input channels. Since it shares the same semantics with the event EndOfUserRecordsEvent we introduced to solve the unaligned checkpoint issues in previous sections, we could reuse this event. 
  2. We would adjust the process of savepoint to first finish the task and then trigger a savepoint so that they could be unified with the final checkpoint process and avoid confused records after savepoint. 

The detail life cycle for the source stream tasks and the operators would become 

Event

Stream Task Status

Operator StatusFinal CheckpointStop with Savepoint with DrainStop with Savepoint

RUNNING


RUNNING---

No more records

or

Received Savepoint Trigger

---

-finish task (cancel source thread for legacy source and suspend the mailbox for the new source)finish task (cancel source thread for legacy source and suspend the mailbox for the new source)

Advanced To MAX_WATERMARK and trigger all the event timersAdvanced To MAX_WATERMARK and trigger all the event timers-

Emit MAX_WATERMARKEmit MAX_WATERMARK-

WAITING_FOR_FINAL_CP



FINISHED

call operator.endInput() & operator.finish()

call operator.endInput() & operator.finish()-

Emit EndOfData[finished = true]Emit EndOfData[finished = true]Emit EndOfData[finished = false]

when checkpoint triggered, emit Checkpoint BarrierEmit Checkpoint BarrierEmit Checkpoint Barrier

Wait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint Completed

Wait for downstream tasks acknowledge EndOfDataWait for downstream tasks acknowledge EndOfDataWait for downstream tasks acknowledge EndOfData
Checkpoint Completed && EndOfData acknowledgedCLOSEDCLOSED--

Call operator.close()Call operator.close()Call operator.close()

Emit EndOfPartitionEventEmit EndOfPartitionEventEmit EndOfPartitionEvent


Similarly, the status for the non-source tasks would become

Event

Stream Task Status

Operator StatusFinal CheckpointStop with Savepoint with DrainStop with Savepoint

RUNNING


RUNNING---


---

---
Aligned on MAX_WATERMARKAdvanced To MAX_WATERMARK and trigger all the event timersAdvanced To MAX_WATERMARK and trigger all the event timersN/A (MAX_WATERMARK is not emitted in this case)

Emit MAX_WATERMARKEmit MAX_WATERMARKN/A
Aligned On EndOfUserRecordsEventWAITING_FOR_FINAL_CPFINISHED

call operator.endInput() & operator.finish()

call operator.endInput() & operator.finish()-

Emit EndOfData[finished = true]Emit EndOfData[finished = true]Emit EndOfData[finished = false]
Aligned on Checkpoint BarrierEmit CheckpointBarrierEmit CheckpointBarrierEmit CheckpointBarrier

Wait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint Completed

Wait for downstream tasks acknowledge EndOfDataWait for downstream tasks acknowledge EndOfDataWait for downstream tasks acknowledge EndOfData

Wait for EndOfPartitionEventWait for EndOfPartitionEventWait for EndOfPartitionEvent
Checkpoint completed/EndOfData acknowledged/EndOfPartition receivedCLOSEDCLOSED



Call operator.close()Call operator.close()Call operator.close()

Emit EndOfPartitionEventEmit EndOfPartitionEventEmit EndOfPartitionEvent


Info
We need to wait for a checkpoint to complete, that started after the finish() method. However, we support concurrent checkpoints. Moreover there is no guarantee the notifyCheckpointComplete arrives or the order in which they will arrive. It should be enough though to wait for notification for any checkpoint that started after finish().

We should make sure though later checkpoints do not leave behind lingering resources.

Imagine a scenario where:
1. task/operator received `finish()`
2. checkpoint 42 triggered (not yet completed)
3. checkpoint 43 triggered (not yet completed)
4. checkpoint 44 triggered (not yet completed)
5. notifyCheckpointComplete(43)


Our proposal is to shutdown the task immediately after seeing first `notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER `finish()`. This should be fine, as:
a) ideally there should be no new pending transactions opened after checkpoint 42
b) even if operator/function is opening some transactions for checkpoint 43 and checkpoint 44 (`FlinkKafkaProducer`), those transactions after checkpoint 42 should be empty

After seeing 5. (notifyCheckpointComplete(43)) It should be good enough to:
- commit transactions from checkpoint 42, (and 43 if they were created, depends on the user code)
- close operator, aborting any pending transactions (for checkpoint 44 if they were opened, depends on the user code)

If checkpoint 44 completes afterwards, it will still be valid. Ideally we would recommend that after seeing `finish()` operators/functions should not be opening any new transactions, but that shouldn't be required.

...

call operator.endInput() & operator.finish()

...

Similarly, the status for the non-source tasks would become

...

Stream Task Status

...

call operator.endInput() & operator.finish()

...


Rejected Alternatives

CheckpointCoordinator Maintains the Finished Status of Tasks Separately

...