...
This section details the proposed changes for the options 3 chosen in the last section.
Changes to Public(Evolving) APIs
We lack a clear method in the API to stop processing records and flush any buffered records. We do have the StreamOperator#close
method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose
method. Lastly, the semantic of RichFunction#close
is different from StreamOperator#close
. Having no direct access to any output the RichFunction#close
is responsible purely for releaseing resources. We suggest using this opportunity to clean up the semi-public StreamOperator
API and:
- remove the
dispose
method - change the semantic of
close
method - introduce new
finish
method
Effectively it would modify the Operator
lifecycle (https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/task_lifecycle/#operator-lifecycle-in-a-nutshell) termination phase to be:
Code Block |
---|
// termination phase
OPERATOR::endOfInput(1)
...
OPERATOR::endOfInput(n)
OPERATOR::finish
UDF(if available)::finish
OPERATOR::snapshotState()
OPERATOR::notifyCheckpointComplete()
OPERATOR::close() --> call UDF's close method
UDF::close() |
In case of failure, we will call the Operator#close → UDF#close method.
Code Block |
---|
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
....
/**
* This method is called at the end of data processing.
*
* <p>The method is expected to flush all remaining buffered data. Exceptions during this
* flushing of buffered should be propagated, in order to cause the operation to be recognized
* as failed, because the last data items are not processed properly.
*
* <p><b>NOTE:</b>This method does not need to close any resources. You should release external
* resources in the {@link #close()} method.
*
* @throws java.lang.Exception An exception in this method causes the operator to fail.
*/
void finish() throws Exception;
/**
* This method is called at the very end of the operator's life, both in the case of a
* successful completion of the operation, and in the case of a failure and canceling.
*
* <p>This method is expected to make a thorough effort to release all resources that the
* operator has acquired.
*
* <p><b>NOTE:</b>It should not emit any records! If you need to emit records at the end of
* processing, do so in the {@link #finish()} method.
*/
void close() throws Exception;
...
} |
The UDF that most often buffers data and thus requires a flushing/finishing phase is the SinkFunction
where it could e.g. create transactions in an external system that can be committed during the final snapshot. Therefore we suggest introducing a finish
method in the SinkFunction
:
Code Block |
---|
@Public
public interface SinkFunction<IN> extends Function, Serializable {
default void finish() throws Exception {}
} |
Checkpoint Format with Finished Tasks
For checkpoints involving finished tasks:
- If all the subtasks of an operator are finished, the checkpoint would store a flag "fully finished = true" so that we could skip the execution of this operator after recovery. To not cause compatibility problems, we would store a special state for this operator instead of directly adding one boolean flag.
- If some subtask of an operator are finished, the checkpoint would store the states collected from the other subtasks for this operator. After recovery, the subtasks would re-assign the remaining states.
Triggering Checkpoints After Tasks Finished
Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.
draw.io Diagram | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Figure 2. An example execution graph with one task finished.
It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.
To find the new "root" of the current execution graph, we iterates all the tasks to find the tasks who are still running but have no running precedent tasks. A direct implementation would have O(n2) time complexity since it needs to check all the precedent tasks of each task. However, we could reduce the complexity to O(n) by exploiting the isomorphism of ALL_TO_ALL edges. The detail is described in Appendix 1.
To avoid the case that the tasks finished during the computation, the computation is done in the JobMaster's main thread. However, there might still be inconsistency due to:
- For tasks running on different TaskManagers, the order of the reports of FINISHED status arrived at JobMaster is not guaranteed. That is to say some tasks might report FINISHED after its descendant tasks.
- The tasks might finish between the computation and the tasks get triggered.
In both cases, the checkpoint trigger would fail and the whole checkpoint would then fail due to timeout. Since checkpoint timeout would block the next checkpoint and cause failover by default, it would be better to detect the trigger failure as earlier as possible. The failure could be detected if a task finished before acknowledging the snapshot for this checkpoint. CheckpointCoordinator would listen to the event of task finish, when a task finish, it iterates all the pending checkpoints to do the detection.
Make CheckpointBarrierHandler Support EndOfPartition on TM Side
For the task side, there are two cases regarding checkpoints with finished tasks:
- All the precedent tasks are finished, the task has received all the EndOfPartition, but not finished yet (due to waiting for the last checkpoint, or waiting for the downstream tasks to process all the pending records introduced in the following).
- Some precedent tasks are finished and the task has received EndOfPartition from these tasks.
All the Precedent Tasks are Finished
Currently tasks could be classified into two types according to how checkpoint is notified: the source tasks are triggered via the RPC from the JobManager, while the non-source tasks are triggered via barriers received from the precedent tasks. If checkpoints after some tasks are finished are considered, the non-source tasks might also get triggered via RPC if they become the new "root" tasks. The implementation for non-source tasks would be different from the source tasks: it would notify the CheckpointBarrierHandler instead of directly performing the snapshot so that the CheckpointBarrierHandler could deal with checkpoint subsuming correctly. Since currently triggerCheckpoint is implemented in the base StreamTask class uniformly and only be called for source StreamTask classes, we would refactor the class hierarchy to support different implementations of triggerCheckpoint method for the two types of tasks.
Some of the Precedent Tasks are Finished
If for one task not all its precedent tasks are finished, the task would still get notified via checkpoint barrier from the running precedent tasks. However, some of the preceding tasks might be already finished and the CheckpointBarrierHandler must consider this case. As a whole, EndOfPartition from one input channel would mark this channel as aligned for all the pending and following checkpoints. Therefore, when receiving EndOfPartition, we will insert a manually created CheckpointBarrier for all the pending checkpoints, and exclude the channel from the following checkpoints.
...
Checkpoint Format with Finished Tasks
For checkpoints involving finished tasks:
- If all the subtasks of an operator are finished, the checkpoint would store a flag "fully finished = true" so that we could skip the execution of this operator after recovery. To not cause compatibility problems, we would store a special state for this operator instead of directly adding one boolean flag.
- If some subtask of an operator are finished, the checkpoint would store the states collected from the other subtasks for this operator. After recovery, the subtasks would re-assign the remaining states.
Triggering Checkpoints After Tasks Finished
Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.
draw.io Diagram | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Figure 2. An example execution graph with one task finished.
It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.
To find the new "root" of the current execution graph, we iterates all the tasks to find the tasks who are still running but have no running precedent tasks. A direct implementation would have O(n2) time complexity since it needs to check all the precedent tasks of each task. However, we could reduce the complexity to O(n) by exploiting the isomorphism of ALL_TO_ALL edges. The detail is described in Appendix 1.
To avoid the case that the tasks finished during the computation, the computation is done in the JobMaster's main thread. However, there might still be inconsistency due to:
- For tasks running on different TaskManagers, the order of the reports of FINISHED status arrived at JobMaster is not guaranteed. That is to say some tasks might report FINISHED after its descendant tasks.
- The tasks might finish between the computation and the tasks get triggered.
In both cases, the checkpoint trigger would fail and the whole checkpoint would then fail due to timeout. Since checkpoint timeout would block the next checkpoint and cause failover by default, it would be better to detect the trigger failure as earlier as possible. The failure could be detected if a task finished before acknowledging the snapshot for this checkpoint. CheckpointCoordinator would listen to the event of task finish, when a task finish, it iterates all the pending checkpoints to do the detection.
Make CheckpointBarrierHandler Support EndOfPartition on TM Side
For the task side, there are two cases regarding checkpoints with finished tasks:
- All the precedent tasks are finished, the task has received all the EndOfPartition, but not finished yet (due to waiting for the last checkpoint, or waiting for the downstream tasks to process all the pending records introduced in the following).
- Some precedent tasks are finished and the task has received EndOfPartition from these tasks.
All the Precedent Tasks are Finished
Currently tasks could be classified into two types according to how checkpoint is notified: the source tasks are triggered via the RPC from the JobManager, while the non-source tasks are triggered via barriers received from the precedent tasks. If checkpoints after some tasks are finished are considered, the non-source tasks might also get triggered via RPC if they become the new "root" tasks. The implementation for non-source tasks would be different from the source tasks: it would notify the CheckpointBarrierHandler instead of directly performing the snapshot so that the CheckpointBarrierHandler could deal with checkpoint subsuming correctly. Since currently triggerCheckpoint is implemented in the base StreamTask class uniformly and only be called for source StreamTask classes, we would refactor the class hierarchy to support different implementations of triggerCheckpoint method for the two types of tasks.
Some of the Precedent Tasks are Finished
If for one task not all its precedent tasks are finished, the task would still get notified via checkpoint barrier from the running precedent tasks. However, some of the preceding tasks might be already finished and the CheckpointBarrierHandler must consider this case. As a whole, EndOfPartition from one input channel would mark this channel as aligned for all the pending and following checkpoints. Therefore, when receiving EndOfPartition, we will insert a manually created CheckpointBarrier for all the pending checkpoints, and exclude the channel from the following checkpoints.
This is enough for aligned checkpoints, but unaligned checkpoints would introduce additional complexity. Since unaligned checkpoint barriers could jump over the pending records, if we instead wait for the EndOfPartition directly, since EndOfPartition could not jump over, the CheckpointCoordinator could not get notified in time and we might incur longer checkpoint periods during the finishing phase. This is similar for the aligned checkpoints with timeout. To cope with this issue, the upstream tasks would wait till the downstream tasks to process all the pending records before exit. The upstream tasks would emit a special event after all the records and the downstream tasks would respond with another special event, and the upstream tasks only exit after all the response events are received. During this period, the unaligned checkpoints or checkpoints with timeout could be done normally. Afterwards the EndOfPartition could reach the downstream CheckpointAligner quickly since there are no pending records.
Changes to Public(Evolving) APIs
We lack a clear method in the API to stop processing records and flush any buffered records. We do have the StreamOperator#close
method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose
method. Lastly, the semantic of RichFunction#close
is different from StreamOperator#close
. Having no direct access to any output the RichFunction#close
is responsible purely for releaseing resources. We suggest using this opportunity to clean up the semi-public StreamOperator
API and:
- remove the
dispose
method - change the semantic of
close
method - introduce new
finish
method
Effectively it would modify the Operator
lifecycle (https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/task_lifecycle/#operator-lifecycle-in-a-nutshell) termination phase to be:
Code Block |
---|
// termination phase
OPERATOR::endOfInput(1)
...
OPERATOR::endOfInput(n)
OPERATOR::finish
UDF(if available)::finish
OPERATOR::snapshotState()
OPERATOR::notifyCheckpointComplete()
OPERATOR::close() --> call UDF's close method
UDF::close() |
In case of failure, we will call the Operator#close → UDF#close method.
Code Block |
---|
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
....
/**
* This method is called at the end of data processing.
*
* <p>The method is expected to flush all remaining buffered data. Exceptions during this
* flushing of buffered should be propagated, in order to cause the operation to be recognized
* as failed, because the last data items are not processed properly.
*
* <p><b>NOTE:</b>This method does not need to close any resources. You should release external
* resources in the {@link #close()} method.
*
* @throws java.lang.Exception An exception in this method causes the operator to fail.
*/
void finish() throws Exception;
/**
* This method is called at the very end of the operator's life, both in the case of a
* successful completion of the operation, and in the case of a failure and canceling.
*
* <p>This method is expected to make a thorough effort to release all resources that the
* operator has acquired.
*
* <p><b>NOTE:</b>It should not emit any records! If you need to emit records at the end of
* processing, do so in the {@link #finish()} method.
*/
void close() throws Exception;
...
} |
The UDF that most often buffers data and thus requires a flushing/finishing phase is the SinkFunction
where it could e.g. create transactions in an external system that can be committed during the final snapshot. Therefore we suggest introducing a finish
method in the SinkFunction
:
Code Block |
---|
@Public
public interface SinkFunction<IN> extends Function, Serializable {
default void finish() throws Exception {}
} |
Do Not Cancel Pending Checkpoint on Finish
...