Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This section details the proposed changes for the options 3 chosen in the last section. 

Changes to Public(Evolving) APIs

We lack a clear method in the API to stop processing records and flush any buffered records. We do have the StreamOperator#close  method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose  method. Lastly, the semantic of RichFunction#close  is different from StreamOperator#close . Having no direct access to any output the RichFunction#close  is responsible purely for releaseing resources. We suggest using this opportunity to clean up the semi-public StreamOperator  API and:

  1. remove the dispose  method
  2. change the semantic of close  method
  3. introduce new finish  method

Effectively it would modify the Operator  lifecycle (https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/task_lifecycle/#operator-lifecycle-in-a-nutshell) termination phase to be:

Code Block
    // termination phase
    OPERATOR::endOfInput(1)
    ...
    OPERATOR::endOfInput(n)

    OPERATOR::finish
        UDF(if available)::finish

    OPERATOR::snapshotState()
    OPERATOR::notifyCheckpointComplete()

    OPERATOR::close() --> call UDF's close method
        UDF::close()

In case of failure, we will call the Operator#close → UDF#close method.

Code Block
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {

    ....

    /**
     * This method is called at the end of data processing.
     *
     * <p>The method is expected to flush all remaining buffered data. Exceptions during this
     * flushing of buffered should be propagated, in order to cause the operation to be recognized
     * as failed, because the last data items are not processed properly.
     *
     * <p><b>NOTE:</b>This method does not need to close any resources. You should release external
     * resources in the {@link #close()} method.
     *
     * @throws java.lang.Exception An exception in this method causes the operator to fail.
     */
    void finish() throws Exception;

    /**
     * This method is called at the very end of the operator's life, both in the case of a
     * successful completion of the operation, and in the case of a failure and canceling.
     *
     * <p>This method is expected to make a thorough effort to release all resources that the
     * operator has acquired.
     *
     * <p><b>NOTE:</b>It should not emit any records! If you need to emit records at the end of
     * processing, do so in the {@link #finish()} method.
     */
    void close() throws Exception;

    ...

}

The UDF that most often buffers data and thus requires a flushing/finishing phase is the SinkFunction where it could e.g. create transactions in an external system that can be committed during the final snapshot. Therefore we suggest introducing a finish  method in the SinkFunction :

Code Block
@Public
public interface SinkFunction<IN> extends Function, Serializable {
    default void finish() throws Exception {}
}

Checkpoint Format with Finished Tasks

For checkpoints involving finished tasks:

  1. If all the subtasks of an operator are finished, the checkpoint would store a flag "fully finished = true" so that we could skip the execution of this operator after recovery. To not cause compatibility problems, we would store a special state for this operator instead of directly adding one boolean flag.
  2. If some subtask of an operator are finished, the checkpoint would store the states collected from the other subtasks for this operator. After recovery, the subtasks would re-assign the remaining states. 

Triggering Checkpoints After Tasks Finished

Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.

draw.io Diagram
bordertrue
diagramNameFigure.1
simpleViewerfalse
linksauto
tbstyletop
lboxtrue
diagramWidth291
revision1

Figure 2. An example execution graph with one task finished. 

It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

To find the new "root" of the current execution graph,  we iterates all the tasks to find the tasks who are still running but have no running precedent tasks. A direct implementation would have O(n2) time complexity since it needs to check all the precedent tasks of each task. However, we could reduce the complexity to O(n) by exploiting the isomorphism of ALL_TO_ALL edges. The detail is described in Appendix 1. 

To avoid the case that the tasks finished during the computation, the computation is done in the JobMaster's main thread. However, there might still be inconsistency due to:

  1. For tasks running on different TaskManagers, the order of the reports of FINISHED status arrived at JobMaster is not guaranteed. That is to say some tasks might report FINISHED after its descendant tasks. 
  2. The tasks might finish between the computation and the tasks get triggered. 

In both cases, the checkpoint trigger would fail and the whole checkpoint would then fail due to timeout. Since checkpoint timeout would block the next checkpoint and cause failover by default, it would be better to detect the trigger failure as earlier as possible. The failure could be detected if a task finished before acknowledging the snapshot for this checkpoint. CheckpointCoordinator would listen to the event of task finish, when a task finish, it iterates all the pending checkpoints to do the detection.

Make CheckpointBarrierHandler Support EndOfPartition on TM Side

For the task side, there are two cases regarding checkpoints with finished tasks:

  1. All the precedent tasks are finished, the task has received all the EndOfPartition, but not finished yet (due to waiting for the last checkpoint, or waiting for the downstream tasks to process all the pending records introduced in the following). 
  2. Some precedent tasks are finished and the task has received EndOfPartition from these tasks.

All the Precedent Tasks are Finished

Currently tasks could be classified into two types according to how checkpoint is notified: the source tasks are triggered via the RPC from the JobManager, while the non-source tasks are triggered via barriers received from the precedent tasks. If checkpoints after some tasks are finished are considered, the non-source tasks might also get triggered via RPC if they become the new "root" tasks. The implementation for non-source tasks would be different from the source tasks: it would notify the CheckpointBarrierHandler instead of directly performing the snapshot so that the CheckpointBarrierHandler could deal with checkpoint subsuming correctly. Since currently triggerCheckpoint is implemented in the base StreamTask class uniformly and only be called for source StreamTask classes, we would refactor the class hierarchy to support different implementations of  triggerCheckpoint method for the two types of tasks.

Some of the Precedent Tasks are Finished

If for one task not all its precedent tasks are finished, the task would still get notified via checkpoint barrier from the running precedent tasks. However, some of the preceding tasks might be already finished and the CheckpointBarrierHandler must consider this case. As a whole, EndOfPartition from one input channel would mark this channel as aligned for all the pending and following checkpoints. Therefore, when receiving EndOfPartition, we will insert a manually created CheckpointBarrier for all the pending checkpoints, and exclude the channel from the following checkpoints. 

...

Checkpoint Format with Finished Tasks

For checkpoints involving finished tasks:

  1. If all the subtasks of an operator are finished, the checkpoint would store a flag "fully finished = true" so that we could skip the execution of this operator after recovery. To not cause compatibility problems, we would store a special state for this operator instead of directly adding one boolean flag.
  2. If some subtask of an operator are finished, the checkpoint would store the states collected from the other subtasks for this operator. After recovery, the subtasks would re-assign the remaining states. 

Triggering Checkpoints After Tasks Finished

Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.

draw.io Diagram
bordertrue
diagramNameFigure.1
simpleViewerfalse
linksauto
tbstyletop
lboxtrue
diagramWidth291
revision1

Figure 2. An example execution graph with one task finished. 

It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

To find the new "root" of the current execution graph,  we iterates all the tasks to find the tasks who are still running but have no running precedent tasks. A direct implementation would have O(n2) time complexity since it needs to check all the precedent tasks of each task. However, we could reduce the complexity to O(n) by exploiting the isomorphism of ALL_TO_ALL edges. The detail is described in Appendix 1. 

To avoid the case that the tasks finished during the computation, the computation is done in the JobMaster's main thread. However, there might still be inconsistency due to:

  1. For tasks running on different TaskManagers, the order of the reports of FINISHED status arrived at JobMaster is not guaranteed. That is to say some tasks might report FINISHED after its descendant tasks. 
  2. The tasks might finish between the computation and the tasks get triggered. 

In both cases, the checkpoint trigger would fail and the whole checkpoint would then fail due to timeout. Since checkpoint timeout would block the next checkpoint and cause failover by default, it would be better to detect the trigger failure as earlier as possible. The failure could be detected if a task finished before acknowledging the snapshot for this checkpoint. CheckpointCoordinator would listen to the event of task finish, when a task finish, it iterates all the pending checkpoints to do the detection.

Make CheckpointBarrierHandler Support EndOfPartition on TM Side

For the task side, there are two cases regarding checkpoints with finished tasks:

  1. All the precedent tasks are finished, the task has received all the EndOfPartition, but not finished yet (due to waiting for the last checkpoint, or waiting for the downstream tasks to process all the pending records introduced in the following). 
  2. Some precedent tasks are finished and the task has received EndOfPartition from these tasks.

All the Precedent Tasks are Finished

Currently tasks could be classified into two types according to how checkpoint is notified: the source tasks are triggered via the RPC from the JobManager, while the non-source tasks are triggered via barriers received from the precedent tasks. If checkpoints after some tasks are finished are considered, the non-source tasks might also get triggered via RPC if they become the new "root" tasks. The implementation for non-source tasks would be different from the source tasks: it would notify the CheckpointBarrierHandler instead of directly performing the snapshot so that the CheckpointBarrierHandler could deal with checkpoint subsuming correctly. Since currently triggerCheckpoint is implemented in the base StreamTask class uniformly and only be called for source StreamTask classes, we would refactor the class hierarchy to support different implementations of  triggerCheckpoint method for the two types of tasks.

Some of the Precedent Tasks are Finished

If for one task not all its precedent tasks are finished, the task would still get notified via checkpoint barrier from the running precedent tasks. However, some of the preceding tasks might be already finished and the CheckpointBarrierHandler must consider this case. As a whole, EndOfPartition from one input channel would mark this channel as aligned for all the pending and following checkpoints. Therefore, when receiving EndOfPartition, we will insert a manually created CheckpointBarrier for all the pending checkpoints, and exclude the channel from the following checkpoints. 

This is enough for aligned checkpoints, but unaligned checkpoints would introduce additional complexity. Since unaligned checkpoint barriers could jump over the pending records, if we instead wait for the EndOfPartition directly, since EndOfPartition could not jump over, the CheckpointCoordinator could not get notified in time and we might incur longer checkpoint periods during the finishing phase. This is similar for the aligned checkpoints with timeout. To cope with this issue, the upstream tasks would wait till the downstream tasks to process all the pending records before exit. The upstream tasks would emit a special event after all the records and the downstream tasks would respond with another special event, and the upstream tasks only exit after all the response events are received. During this period, the unaligned checkpoints or checkpoints with timeout could be done normally. Afterwards the EndOfPartition could reach the downstream CheckpointAligner quickly since there are no pending records.


Changes to Public(Evolving) APIs

We lack a clear method in the API to stop processing records and flush any buffered records. We do have the StreamOperator#close  method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose  method. Lastly, the semantic of RichFunction#close  is different from StreamOperator#close . Having no direct access to any output the RichFunction#close  is responsible purely for releaseing resources. We suggest using this opportunity to clean up the semi-public StreamOperator  API and:

  1. remove the dispose  method
  2. change the semantic of close  method
  3. introduce new finish  method

Effectively it would modify the Operator  lifecycle (https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/task_lifecycle/#operator-lifecycle-in-a-nutshell) termination phase to be:

Code Block
    // termination phase
    OPERATOR::endOfInput(1)
    ...
    OPERATOR::endOfInput(n)

    OPERATOR::finish
        UDF(if available)::finish

    OPERATOR::snapshotState()
    OPERATOR::notifyCheckpointComplete()

    OPERATOR::close() --> call UDF's close method
        UDF::close()

In case of failure, we will call the Operator#close → UDF#close method.


Code Block
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {

    ....

    /**
     * This method is called at the end of data processing.
     *
     * <p>The method is expected to flush all remaining buffered data. Exceptions during this
     * flushing of buffered should be propagated, in order to cause the operation to be recognized
     * as failed, because the last data items are not processed properly.
     *
     * <p><b>NOTE:</b>This method does not need to close any resources. You should release external
     * resources in the {@link #close()} method.
     *
     * @throws java.lang.Exception An exception in this method causes the operator to fail.
     */
    void finish() throws Exception;

    /**
     * This method is called at the very end of the operator's life, both in the case of a
     * successful completion of the operation, and in the case of a failure and canceling.
     *
     * <p>This method is expected to make a thorough effort to release all resources that the
     * operator has acquired.
     *
     * <p><b>NOTE:</b>It should not emit any records! If you need to emit records at the end of
     * processing, do so in the {@link #finish()} method.
     */
    void close() throws Exception;

    ...

}

The UDF that most often buffers data and thus requires a flushing/finishing phase is the SinkFunction where it could e.g. create transactions in an external system that can be committed during the final snapshot. Therefore we suggest introducing a finish  method in the SinkFunction :


Code Block
@Public
public interface SinkFunction<IN> extends Function, Serializable {
    default void finish() throws Exception {}
}


Do Not Cancel Pending Checkpoint on Finish

...