Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRA: <pending>

Vote threadhttps://lists.apache.org/thread/9v5bqx9nm3k1j06d0svywbbb6hcjcwp3
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-2491

Release1.14

...



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Therefore, we need to also support checkpoints even after some tasks are finished. 

Overall Design

There are multiple options to achieve this target. In this section we compare different options.

Option 1. Prevent tasks from finishing

The first option is to prevent the tasks from finishing until the whole job is finished. Specially, tasks would never finish if the job also has unbounded sources. This option is not preferred due to that

  1. it causes zombie tasks and waste resources. 
  2. it also requires a new “pre-EndOfPartition” event to indicate all the records are sent. Otherwise if it directly sent EndOfPartition before tasks finished, the communication channel would be destroyed and it would also be weird to have checkpoint barriers after EndOfPartition. However, introducing the “pre-EndOfPartition” event would largely complicate the termination process. 

Option 2. Allow tasks to finish & Checkpoints contain the final states from finished tasks

Another option is allowing tasks to finish normally and checkpoints after tasks finished would only take snapshots for the running tasks. A core issue of this option is whether we need to include the final states collected when a tasks finish in the following checkpoints. Currently when failover happens after some tasks are finished, the job will fallback to a checkpoint taken when all the tasks are running. Including the final states of the finished tasks ensures the behavior unchanged compared with the current one since the finished tasks could be viewed as still running. However it also introduce some problems:

  1. Including the states from finished tasks in all the following checkpoints requires the states get managed in the master side, which causes additional overhead.
  2. Since the same states from the finished tasks would be used in multiple checkpoints, we need to introduce the reference count between checkpoints and snapshots. This complicates the checkpoint management, especially after we already have the reference count between snapshots and state items due to incremental checkpoints.
  3. Including the states from the finished tasks implies that even if all the tasks of an operator have finished, we still need to restart the tasks of this operators after failover. For the long run, it limits the possibility that we only resume the execution for the operators not fully finished. 

Based on such an ability, the operators writing data to the external systems in 2pc style would be able to wait for one more checkpoint after tasks emitted all the records to commit the last piece of data. This is similar to the job terminated with stop-with-savepoint --drain, thus we would also like to unify the two processes so that the last piece of data could be committed in all the required cases. To achieve this target efficiently we would also like to adjust the operator API and the life cycle of the StreamTask. 

Overall Design

There are multiple options to achieve this target. In this section we compare different options.

Option 1. Prevent tasks from finishing

The first option is to prevent the tasks from finishing until the whole job is finished. Specially, tasks would never finish if the job also has unbounded sources. This option is not preferred due to that

  1. it causes zombie tasks and waste resources. 
  2. it also requires a new “pre-EndOfPartition” event to indicate all the records are sent. Otherwise if it directly sent EndOfPartition before tasks finished, the communication channel would be destroyed and it would also be weird to have checkpoint barriers after EndOfPartition. However, introducing the “pre-EndOfPartition” event would largely complicate the termination process. 

Option 2. Allow tasks to finish & Checkpoints contain the final states from finished tasks

Another option is allowing tasks to finish normally and checkpoints after tasks finished would only take snapshots for the running tasks. A core issue of this option is whether we need to include the final states collected when a tasks finish in the following checkpoints. Currently when failover happens after some tasks are finished, the job will fallback to a checkpoint taken when all the tasks are running. Including the final states of the finished tasks ensures the behavior unchanged compared with the current one since the finished tasks could be viewed as still running. However it also introduce some problems:

  1. Including the states from finished tasks in all the following checkpoints requires the states get managed in the master side, which causes additional overhead.
  2. Since the same states from the finished tasks would be used in multiple checkpoints, we need to introduce the reference count between checkpoints and snapshots. This complicates the checkpoint management, especially after we already have the reference count between snapshots and state items due to incremental checkpoints.
  3. Including the states from the finished tasks implies that even if all the tasks of an operator have finished, we still need to restart the tasks of this operators after failover. For the long run, it limits the possibility that we only resume the execution for the operators not fully finished. 

draw.io Diagram
border
draw.io Diagram
bordertrue
diagramNameFigure.3
simpleViewerfalse
width400
linksauto
tbstyletop
lboxtrue
diagramWidth511
revision3

Figure 1. An illustration of the structure of the Checkpoint. One issue is that whether we need to keep the operator states collected from the finished tasks. 

Option 3. Allow tasks to finish & Checkpoints do not contain the final states from finished tasks (The Chosen One)

If we do not want to include the states from the finished tasks instead, we need to explore how it changes the current behavior. Although in a checkpoint the state is snapshotted in the unit of task, it is finally reorganized by the unit of operator since there might be topology changes or rescaling. In other words, we need to view the checkpoint as composed of the current working progress of each operator, and tasks are only stateless runtime units to execute the remaining work for the operators. If we do not include the state from the finished tasks, it is equivalent to some operator discarding a part of finished work’s state in the checkpoint. Let ΔR represents the state of running task and ΔF represents the final states of finished tasks when taking checkpoints, then the result (e.g., the records sent to the descendant operator) of the operator’s execution after failover is

...

The situation of the second part is equivalent to the source operators. However, the non-source operators rarely have the similar logic as legacy source. Instead, the result related to ΔF is usually sent before finished and does not need to resent after failover. For example, the operators doing aggregation and sending the final result on endOfInput only need sending the remaining aggregation value after failover. Even if there are operators that does not satisfy the condition, the operators could push the states could not be discarded to OperatorCoordinator instead, like the new Source API does. Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

Proposed Changes

This section details the proposed changes for the options 3 chosen in the last section.

Triggering Checkpoints After Tasks Finished

Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.

draw.io Diagram
bordertrue
diagramNameFigure.1
simpleViewerfalse
linksauto
tbstyletop
lboxtrue
diagramWidth291
revision1

Figure 2. An example execution graph with one task finished. 

It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

The basic algorithm to compute the tasks to trigger would be iterating over the ExecutionGraph to find the new root running tasks. However, the algorithm could be optimized by iterating over the JobGraph instead. The detailed algorithm is shown in Appendix. 

...

Figure 3. Different options to manage the (Stream)Task's Finished status.

A core divergence for this part is what kind of tasks are considered as FINISHED. As a whole, there are two possible options:

Option 1: Reuse Tasks' FINISH status

The first option is to reuse the tasks' FINISH status managed by the Scheduler. This option avoids additional overhead to maintain tasks' status. Since the tasks' status is modified in JobMaster's main thread, to avoid data inconsistencies we would need to proxy the task to compute tasks to trigger also to the main thread. 

This option might have one problem. During the time we compute the tasks to trigger and the trigger message reaching the TaskExecutor, the tasks might finish during this period, which make the trigger fail. If we do not deal with this situation, the checkpoint would fail after timeout since not all tasks report their snapshots. To deal with this problem, we would need to make CheckpointCoordinator listen to the FINISH status report of the tasks, and if one task finished before reporting snapshot, CheckpointCoordinator would re-compute the tasks to trigger and re-trigger the new sources after the tasks finished. Although this patch could solve this issue, if we met with the cascade cases unfortunately that the triggers keep fail due to tasks get finished, the JM might incurs high overhead. 

Option 2: Checkpoint Coordinator maintains the finish status separately

If triggering one task fails due to the task finished in advance, what we need to is to report an empty state for this task and re-trigger its descendant tasks. The first action is simple to achieve since the state is empty and the core is how to notify the descendant tasks. For option 1 we have to go via JobMaster since when the task report FINISH the channels to the descendant tasks have already been closed by emitting EndOfPartition.

Therefore, another option is to not reuse the tasks' FINISH status and introduce another report before emitting EndOfPartition. As shown in Figure 2, we could make StreamTask report to CheckpointCoordinator about its going to finish. CheckpointCoordinator would marked the task as finished immediately when received the report, and reply the tasks with the pending checkpoints that do not trigger it successfully. The StreamTask would then broadcast corresponding barriers to the descendant tasks as normal checkpoints. 

Compare to the first option, this option introduces a new RPC and asks CheckpointCoordinator to maintain a set of finished tasks. CheckpointCoordinator also need to remove the tasks from the set if it restarted. However, the maintained set could be discarded on JobMaster Failover, which decrease the complexity. Besides, this option avoid the cascade finish problem. 

For the first version we would go with the option 1 to avoid complex implementation involving new RPC and duplicate finished state.

Extend the task Barrier Alignment

On the task side, we also need to modify the task barrier alignment mechanism to support the cases that some precedent tasks are finished. In this case, the downstream task would receive the EndOfPartition messages from the input channels.

...

Operator state

There are a few scenarios in which operator states are used in combination with an additional implicit contract on data distribution. Those implicit contracts might not hold in case we restore state with partially finished operators. Therefore we would like to discuss how we want to use the operator state in combination with finished tasks.

BroadcastState

In case of broadcast state all operators snapshot their state. The assumption is that every such state is equal and can be used when restoring for any given subtask. We want to leverage that property and when restoring with some subtasks finished, we would use any of the non-empty state (taken for any running subtask).

UnionListState

The UnioinListState is more complex. A common usage pattern is to implement a "broadcast" state by storing state on a single subtask.  Afterwards, when initializing subtasks the single copy of the state would be distributed to all subtasks. Another common pattern is to use a UnionListState to create a global view. It is used for example to share information about offsets of partitions which has been consumed so far. This lets us restart from the given offset if after restore a partition is assigned to a different subtask than originally. Such a logic can only be implemented with a merged state of all original subtasks If a part of subtasks are finished and we only keep the remaining state in the checkpoint. This can obviously loose important bit of information, or even the entire state in case of described implementation of broadcast state. However, since the UnionListState is on the way to be deprecated and replaced by the OperatorCoordinator.

For the time being, we will allow for a situation that if an operator checkpoints UnionListState, it can only finish all at once. We will decline checkpoints if not all of the tasks called finished and received notifyCheckpointComplete .

ListState

We want to make the contract of ListState more explicit that the redistribution may happen even in case there is no rescaling. This might have some sophisticated implications. Imagine a situation where you have a topology:

src 0 --> op 0 --> sink 0

src 1 --> op 1 --> sink 1

src 2 --> op 2 --> sink 2


We buffer records in operators op X. If src 1 finishes its state will be cleared. Than if after a restore the state of src 2 is assigned to src 1.  Records from partitions originally assigned to src 2 will end up in both op1 and op2. Depending on the processing speed of the two operators, if the op 1 unbuffers records faster it may happen that later records from such partition will overtake earlier records in the pipeline.  However, Flink never offered explicit guarantees that in case of recovery, non-keyed ListState will be assigned in the same order to subtasks before and after recovery, especially across multiple operators. As part of this FLIP, we want to document and clarify this.

Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

Proposed Changes

This section details the proposed changes for the options 3 chosen in the last section. 

Feature flag

We want to introduce a feature flag to enable or disable the new feature:


Code Block
    @Documentation.ExcludeFromDocumentation("This is a feature toggle")
    public static final ConfigOption<Boolean> ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH =
            ConfigOptions.key("execution.checkpointing.checkpoints-after-tasks-finish.enabled")
                    .booleanType()
                    .defaultValue(false)
                    .withDescription(
                            "Feature toggle for enabling checkpointing after tasks finish.");


Checkpoint Format with Finished Tasks

For checkpoints involving finished tasks:

  1. If all the subtasks of an operator are finished, the checkpoint would store a flag "fully finished = true" so that we could skip the execution of this operator after recovery. To not cause compatibility problems, we would store a special state for this operator instead of directly adding one boolean flag.
  2. If some subtask of an operator are finished, the checkpoint would store the states collected from the other subtasks for this operator. After recovery, the subtasks would re-assign the remaining states. 

Triggering Checkpoints After Tasks Finished

Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.

draw.io Diagram
bordertrue
diagramNameFigure.1
simpleViewerfalse
linksauto
tbstyletop
lboxtrue
diagramWidth291
revision1

Figure 2. An example execution graph with one task finished. 

It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

To find the new "root" of the current execution graph,  we iterates all the tasks to find the tasks who are still running but have no running precedent tasks. A direct implementation would have O(n2) time complexity since it needs to check all the precedent tasks of each task. However, we could reduce the complexity to O(n) by exploiting the isomorphism of ALL_TO_ALL edges. The detail is described in Appendix 1. 

To avoid the case that the tasks finished during the computation, the computation is done in the JobMaster's main thread. However, there might still be inconsistency due to:

  1. For tasks running on different TaskManagers, the order of the reports of FINISHED status arrived at JobMaster is not guaranteed. That is to say some tasks might report FINISHED after its descendant tasks. 
  2. The tasks might finish between the computation and the tasks get triggered. 

In both cases, the checkpoint trigger would fail and the whole checkpoint would then fail due to timeout. Since checkpoint timeout would block the next checkpoint and cause failover by default, it would be better to detect the trigger failure as earlier as possible. The failure could be detected if a task finished before acknowledging the snapshot for this checkpoint. CheckpointCoordinator would listen to the event of task finish, when a task finish, it iterates all the pending checkpoints to do the detection.

Make CheckpointBarrierHandler Support EndOfPartition on TM Side

For the task side, there are two cases regarding checkpoints with finished tasks:

  1. All the precedent tasks are finished, the task has received all the EndOfPartition, but not finished yet (due to waiting for the last checkpoint, or waiting for the downstream tasks to process all the pending records introduced in the following). 
  2. Some precedent tasks are finished and the task has received EndOfPartition from these tasks.

All the Precedent Tasks are Finished

Currently tasks could be classified into two types according to how checkpoint is notified: the source tasks are triggered via the RPC from the JobManager, while the non-source tasks are triggered via barriers received from the precedent tasks. If checkpoints after some tasks are finished are considered, the non-source tasks might also get triggered via RPC if they become the new "root" tasks. The implementation for non-source tasks would be different from the source tasks: it would notify the CheckpointBarrierHandler instead of directly performing the snapshot so that the CheckpointBarrierHandler could deal with checkpoint subsuming correctly. Since currently triggerCheckpoint is implemented in the base StreamTask class uniformly and only be called for source StreamTask classes, we would refactor the class hierarchy to support different implementations of  triggerCheckpoint method for the two types of tasks.

Some of the Precedent Tasks are Finished

If for one task not all its precedent tasks are finished, the task would still get notified via checkpoint barrier from the running precedent tasks. However, some of the preceding tasks might be already finished and the CheckpointBarrierHandler must consider this case. As a whole, EndOfPartition from one input channel would mark this channel as aligned for all the pending and following checkpoints. Therefore, when receiving EndOfPartition, we will insert a manually created CheckpointBarrier for all the pending checkpoints, and exclude the channel from the following checkpoints. 

This is enough for aligned checkpoints, but unaligned checkpoints would introduce additional complexity. Since unaligned checkpoint barriers could jump over the pending records, if we instead wait for the EndOfPartition directly, since EndOfPartition could not jump over, the CheckpointCoordinator could not get notified in time and we might incur longer checkpoint periods during the finishing phase. This is similar for the aligned checkpoints with timeout. To cope with this issue, the upstream tasks would wait till the downstream tasks to process all the pending records before exit. The upstream tasks would emit a special event, namely EndOfData, after all the records and the downstream tasks would respond with another special event, and the upstream tasks only exit after all the response events are received. During this period, the unaligned checkpoints or checkpoints with timeout could be done normally. Afterwards the EndOfPartition could reach the downstream CheckpointAligner quickly since there are no pending records.

Do Not Cancel Pending Checkpoint on Finish

Currently when a task finish, all the pending checkpoints would be canceled. Since now we would support checkpoints after tasks finished, tasks finish should not fail the checkpoints and we would change the behavior to waiting for all the pending checkpoints to finish before task get finished.

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. However, for the first version, we would still restart all the tasks and skip the execution of fully finished operators. This simplifies the implementation of the task side and scheduler. 

Changes the Operator APIs Related to Finish

Based on the ability to do checkpoint or savepoint after tasks finished, operators interacting with external systems in 2PC style could commit the last piece of data by waiting for one more checkpoint or savepoint. In general, for jobs to finish there are three cases:

  1. All records are processed and the job finished normally.
  2. The job is finished due to stop-with-savepoint --drain.
  3. The job is finished due to stop-with-savepoint.

For the first two cases, the job would not be expected to be restarted, thus we should flush all the records and commit all the records. But for the third case, the job is expected to be re-started from the savepoint, thus in this case we should not flush all the records. For example, for window operators using event-time triggers, it would be reasonable to trigger all the remaining timers if it is going to finish, but for the third case, the job would be restarted and process more records, triggering all the timers would cause wrong result after restarting. 

To flush all the records on finished, we need to 

  1. Emit a special MAX_WATERMARK to trigger all the event time timers.
  2. Call the dedicated operator API to flush the other buffered records. 

However, currently we lack a clear method in the API to stop processing records and flush buffered records. We lack a clear method in the API to stop processing records and flush any buffered records. We do have the StreamOperator#close  method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose  method. Lastly, the semantic of RichFunction#close  is different from StreamOperator#close . Having no direct access to any output the RichFunction#close  is responsible purely for releasing resources. We suggest using this opportunity to clean up the semi-public StreamOperator  API and:

  1. remove the dispose  method
  2. change the semantic of close  method
  3. introduce new finish  method

Effectively it would modify the Operator  lifecycle (https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/task_lifecycle/#operator-lifecycle-in-a-nutshell) termination phase to be:

Code Block
    // termination phase
    OPERATOR::endOfInput(1)
    ...
    OPERATOR::endOfInput(n)

    OPERATOR::finish
        UDF(if available)::finish

    OPERATOR::snapshotState()
    OPERATOR::notifyCheckpointComplete()

    OPERATOR::close() --> call UDF's close method
        UDF::close()

In case of failure, we will call the Operator#close → UDF#close method.


Code Block
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {

    ....

    /**
     * This method is called at the end of data processing.
     *
     * <p>The method is expected to flush all remaining buffered data. Exceptions during this
     * flushing of buffered should be propagated, in order to cause the operation to be recognized
     * as failed, because the last data items are not processed properly.
     *
     * <p><b>After this method is called, no more records can be produced for the downstream operators.</b>
     *
     * <p><b>NOTE:</b>This method does not need to close any resources. You should release external
     * resources in the {@link #close()} method.
     *
     * @throws java.lang.Exception An exception in this method causes the operator to fail.
     */
    void finish() throws Exception;

    /**
     * This method is called at the very end of the operator's life, both in the case of a
     * successful completion of the operation, and in the case of a failure and canceling.
     *
     * <p>This method is expected to make a thorough effort to release all resources that the
     * operator has acquired.
     *
     * <p><b>NOTE:</b>It should not emit any records! If you need to emit records at the end of
     * processing, do so in the {@link #finish()} method.
     */
    void close() throws Exception;

    ...

}

The UDF that most often buffers data and thus requires a flushing/finishing phase is the SinkFunction where it could e.g. create transactions in an external system that can be committed during the final snapshot. Therefore we suggest introducing a finish  method in the SinkFunction :

Code Block
@Public
public interface SinkFunction<IN> extends Function, Serializable {
    default void finish() throws Exception {}
}

Currently there would be some duplication between endOfInput() and finish() for one input operators, for the long run we would like to unify them to finish() method and keep only endOfInput(int channelIndex) and finish(). 

Info

The finish() method is tied to the lifecycle of an Operator and it is NOT tied to the lifecycle of State. It is possible that after recovery a state created after calling finish() might be restored to an operator that will receive more records. The operator implementation must be prepared for that.


In particular it is generally not safe to write a finished flag into the state and check that you do not receive records if the flag is set.


Final Checkpoint and Stopping Job With Savepoint

If we directly map the original close() to the new finish() method, based on the current StreamTask's implementation, it would call the finish() method after received EndOfPartitionEvent and emit EndOfPartitionEvent after the last checkpoint / savepoint is completed. This would cause the tasks to wait for the last checkpoint / savepoint one-by-one, which is inefficient. 

Besides, currently stop-with-savepoint [--drain] uses a different process: it first triggers a savepoint and blocks the tasks till the savepoint is completed, then it finishes the tasks. There would cause duplication if we want to also flush the records for stop-with-savepoint --drain. Besides, currently there might still be records between the savepoint and finishTask, this would cause confusion, especially if these records affect external systems. 

To deal with the above issues, we would like to adjust the lifecycle of the StreamTasks to avoid chained waiting and diverged processes:

  1. We would like to introduce a new event to notify all the user records are finished, and call finish() immediately after the new event is aligned for all the input channels. Since it shares the same semantics with the event EndOfUserRecordsEvent we introduced to solve the unaligned checkpoint issues in previous sections, we could reuse this event. 
  2. We would adjust the process of savepoint to first finish the task and then trigger a savepoint so that they could be unified with the final checkpoint process and avoid confused records after savepoint. 

The detail life cycle for the source stream tasks and the operators would become 

Event

Stream Task Status

Operator StatusFinal CheckpointStop with Savepoint with DrainStop with Savepoint

RUNNING


RUNNING---

No more records

or

Received Savepoint Trigger

---

-finish task (cancel source thread for legacy source and suspend the mailbox for the new source)finish task (cancel source thread for legacy source and suspend the mailbox for the new source)

Advanced To MAX_WATERMARK and trigger all the event timersAdvanced To MAX_WATERMARK and trigger all the event timers-

Emit MAX_WATERMARKEmit MAX_WATERMARK-

WAITING_FOR_FINAL_CP



FINISHED

call operator.endInput() & operator.finish()

call operator.endInput() & operator.finish()-

Emit EndOfData[finished = true]Emit EndOfData[finished = true]Emit EndOfData[finished = false]

when checkpoint triggered, emit Checkpoint BarrierEmit Checkpoint BarrierEmit Checkpoint Barrier

Wait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint Completed

Wait for downstream tasks acknowledge EndOfDataWait for downstream tasks acknowledge EndOfDataWait for downstream tasks acknowledge EndOfData
Checkpoint Completed && EndOfData acknowledgedCLOSEDCLOSED--

Call operator.close()Call operator.close()Call operator.close()

Emit EndOfPartitionEventEmit EndOfPartitionEventEmit EndOfPartitionEvent


Similarly, the status for the non-source tasks would become

Event

Stream Task Status

Operator StatusFinal CheckpointStop with Savepoint with DrainStop with Savepoint

RUNNING


RUNNING---


---

---
Aligned on MAX_WATERMARKAdvanced To MAX_WATERMARK and trigger all the event timersAdvanced To MAX_WATERMARK and trigger all the event timersN/A (MAX_WATERMARK is not emitted in this case)

Emit MAX_WATERMARKEmit MAX_WATERMARKN/A
Aligned On EndOfUserRecordsEventWAITING_FOR_FINAL_CPFINISHED

call operator.endInput() & operator.finish()

call operator.endInput() & operator.finish()-

Emit EndOfData[finished = true]Emit EndOfData[finished = true]Emit EndOfData[finished = false]
Aligned on Checkpoint BarrierEmit CheckpointBarrierEmit CheckpointBarrierEmit CheckpointBarrier

Wait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint Completed

Wait for downstream tasks acknowledge EndOfDataWait for downstream tasks acknowledge EndOfDataWait for downstream tasks acknowledge EndOfData

Wait for EndOfPartitionEventWait for EndOfPartitionEventWait for EndOfPartitionEvent
Checkpoint completed/EndOfData acknowledged/EndOfPartition receivedCLOSEDCLOSED



Call operator.close()Call operator.close()Call operator.close()

Emit EndOfPartitionEventEmit EndOfPartitionEventEmit EndOfPartitionEvent


Info
We need to wait for a checkpoint to complete, that started after the finish() method. However, we support concurrent checkpoints. Moreover there is no guarantee the notifyCheckpointComplete arrives or the order in which they will arrive. It should be enough though to wait for notification for any checkpoint that started after finish().

We should make sure though later checkpoints do not leave behind lingering resources.

Imagine a scenario where:
1. task/operator received `finish()`
2. checkpoint 42 triggered (not yet completed)
3. checkpoint 43 triggered (not yet completed)
4. checkpoint 44 triggered (not yet completed)
5. notifyCheckpointComplete(43)


Our proposal is to shutdown the task immediately after seeing first `notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER `finish()`. This should be fine, as:
a) ideally there should be no new pending transactions opened after checkpoint 42
b) even if operator/function is opening some transactions for checkpoint 43 and checkpoint 44 (`FlinkKafkaProducer`), those transactions after checkpoint 42 should be empty

After seeing 5. (notifyCheckpointComplete(43)) It should be good enough to:
- commit transactions from checkpoint 42, (and 43 if they were created, depends on the user code)
- close operator, aborting any pending transactions (for checkpoint 44 if they were opened, depends on the user code)

If checkpoint 44 completes afterwards, it will still be valid. Ideally we would recommend that after seeing `finish()` operators/functions should not be opening any new transactions, but that shouldn't be required.


Rejected Alternatives

CheckpointCoordinator Maintains the Finished Status of Tasks Separately

Currently we have to deal with the case that tasks finished during triggering. Another option is to let the tasks synchronize with the CheckpointCoordinator on finishing: The tasks report finish status to CheckpointCoordinator, and CheckpointCoordinator cancels the pending checkpoints or replies with the pending checkpoints to wait for. The CheckpointCoordinator would maintains the finished status separately and use this status in computing of the following checkpoints.

Although this method could solve the inconsistency caused by tasks finished before get triggered, it would introduces additional complexity, and we would not apply this method present. 

Insert Barriers On Received EndOfPartition at the Task Side

Another option for make CheckpointBarrierHandler support finished tasks is when received EndOfPartition from one channel, CheckpointBarrierHandler inserts the barriers for the pending checkpoints into this channel, right before the EndOfPartition event. These barriers would be processed the same as normal barriers received. The we could do not modify the CheckpointBarrierHandler logic. 

However, this option could not support unaligned checkpoints before the EndOfPartition event has transferred to the downstream tasks, and it requires complex modification to the input channels, thus we do not use this option.

...

Figure 4. The tasks would either receive the checkpoint trigger and receive EndOfPartition from all the channels, or do not receive checkpoint trigger and receive barriers from some channels.

For one task, it might be notified of checkpoint via two ways:

  1. If all its precedent tasks have finished, it would receive the checkpoint trigger message via RPC.
  2. If some of its precedent tasks are still running, it would receive the checkpoint barrier from the corresponding channels.

For both cases, to avoid interference with the logic of checkpoint alignment, we would insert barriers in the channels right before EndOfPartition:

  1. Whenever an input channel enqueued EndOfPartition from network or local precedent result partition, it would know the corresponding precedent task is finished. It would then insert a prioritized message into the channel to notify the CheckpointBarrierHandler about it. 
  2. Whenever CheckpointBarrierHandle received a new barrier or checkpoint trigger for the first time, it would insert the corresponding barriers into all the input channels that have received EndOfPartition, right before the EndOfPartition. The inserted barrier would respect the options of the checkpoint, like enabled unaligned checkpoint or checkpoint alignment timeout.
  3. During step 1, when CheckpointBarrierHandler knows that an input channel has enqueued EndOfPartition, it would also check the pending checkpoints and insert barriers into this input channel. 
  4. After CheckpointBarrierHandler has actually processed EndOfPartition from one channel, it would stop watching this channel and decrease the total number of barriers required for each checkpoint.
  5. After CheckpointBarrierHandler has processed EndOfPartition from all the channels, it would only received checkpoint notification of case 1 and would directly trigger snapshot. 

One pending problem here is for unaligned checkpoint, the barriers are inserted only after the downstream tasks have received EndOfPartition. Compared with normal unaligned checkpoint, the checkpoint here could only skip the buffer cached in the input channels, but not the buffers in the upstream tasks' result partitions. This might enlarge the time required. We could not allows the barriers to skip the result partition of the upstream tasks because the upstream tasks should have finished and could not recover the snapshotted result partition data after failover. To avoid introducing slow checkpoint, we might make the upstream tasks to wait till all the buffers in the result partition are flushed into the network. Then during this period, the normal unaligned checkpoint could still be taken. But since it does not affect the correctness, we would consider this optimization in future version. 

Do Not Cancel Pending Checkpoint on Finish

Currently when a task finish, all the pending checkpoints would be canceled. Since now we would support checkpoints after tasks finished, tasks finish should not fail the checkpoints and we would change the behavior to waiting for all the pending checkpoints to finish before task get finished.

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. Specially, if all the source operators have been finished, the remaining operator would directly start the process of finishing. 

For the first version, we would still restart all the tasks, and the finished tasks would start finishing normally. This simplifies the implementation of the task side and scheduler. 


Compatibility, Deprecation and Migration Plan

...

The target tasks to trigger are those who are still running but do not have any running precedent tasks. To avoid introducing large overhead to computing tasks to trigger for large jobs, we travel the JobGraph instead of traveling the ExecutionGraph directly. The algorithm is based on the following observation: 

  1. If JobVertex A is connected to JobVertex B and not all tasks of JobVertex A are running, then no tasks of JobVertex B might be triggered if A -> the job edge A → B is ALL_TO_ALL , and only tasks connected from finished tasks might be triggered if A -> B is POINTWISE.
  2. If one task is known to not need to trigger, it must be RUNNING since it is connected from some other running tasks. This indicates that the FINISHED tasks are always a subset of the tasks we need to check for triggering for each JobVertex.
  3. If one JobVertex is connected from multiple precedents JobVertices, then the tasks to check is the Union of the tasks decided by each president JobVertices.

Based on the above observation, we travel the JobVertices via topology order and for each job vertex, we could narrow down the tasks to check for all its descendant JobVertices. For the tasks to check, the not finished ones would be tasks to trigger. The pseudo-code of the algorithm would be

  1. some tasks of JobVertex A is running, then all the tasks of JobVertex B must be running and do not need trigger.
  2. If the job edge A → B is POINTWISE and all the tasks of JobVertex A is running, the all the tasks of JobVertex B must be running and do not need trigger.

Based on the above observation, we could first do a fast check for each job vertex to see if it is possible to have some tasks to trigger. If the fast check fails, we know that for all the ALL_TO_ALL input edges, the upstream tasks must be finished, then we could only check if the tasks have running precedent tasks via POINTWISE edges. Therefore, the overall time complexity would be O(n).

Code Block
linenumberstrue
each_jv:
for each job vertex JV;do
    if all tasks of JV finished;then
        continue;
	endif

	// The fast check
	for each input job edge IJE;do
		if (IJE is AlL_TO_ALL and some tasks are running) or (IJE is POINTWISE and all tasks are running);then
			continue each_jv;
		endif
	endfor

	for each task of JV;do
		has_running_precedent_tasks = false;
        for all the precedent tasks PT connected via POINTWISE edges;do
			if PT is running;then
				has_running_precedent_tasks = true;
				break;
			endif;
		done
		if task is running and !has_running_precedent_tasks;then
			mark this task as need triggering;
		endif
	endfor
endfor
Code Block
linenumberstrue
Initialize V.tasks_to_check for each JobVertex V to be all its tasks;

for each JobVertex V in topology order;do
    if V.filtered_tasks_to_check is empty;then
        continue;
    endif

    running, finished = split V.filtered_tasks_to_check
    add running tasks to the tasks to trigger

    for each descendant JobVertex SubV;do
        if finished < all_tasks;then
            if V -> SubV is ALL_TO_ALL;then
                filtered_tasks_to_check = {}
            elif V -> SubV is POINTWISE;then
                filtered_tasks_to_check={tasks connected from running tasks of V}
            endif

            SubV.tasks_to_check = SubV.tasks_to_check union filtered_tasks_to_check;
        endif
    done
done