Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This is enough for aligned checkpoints, but unaligned checkpoints would introduce additional complexity. Since unaligned checkpoint barriers could jump over the pending records, if we instead wait for the EndOfPartition directly, since EndOfPartition could not jump over, the CheckpointCoordinator could not get notified in time and we might incur longer checkpoint periods during the finishing phase. This is similar for the aligned checkpoints with timeout. To cope with this issue, the upstream tasks would wait till the downstream tasks to process all the pending records before exit. The upstream tasks would emit a special event after all the records and the downstream tasks would respond with another special event, and the upstream tasks only exit after all the response events are received. During this period, the unaligned checkpoints or checkpoints with timeout could be done normally. Afterwards the EndOfPartition could reach the downstream CheckpointAligner quickly since there are no pending records.

Changes to Public(Evolving) APIs

We lack a clear method in the API to stop processing records and flush any buffered records. We do have the StreamOperator#close  method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose  method. Lastly, the semantic of RichFunction#close  is different from StreamOperator#close . Having no direct access to any output the RichFunction#close  is responsible purely for releaseing resources. We suggest using this opportunity to clean up the semi-public StreamOperator  API and:

  1. remove the dispose  method
  2. change the semantic of close  method
  3. introduce new finish  method

Do Not Cancel Pending Checkpoint on Finish

Currently when a task finish, all the pending checkpoints would be canceled. Since now we would support checkpoints after tasks finished, tasks finish should not fail the checkpoints and we would change the behavior to waiting for all the pending checkpoints to finish before task get finished.

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. However, for the first version, we would still restart all the tasks and skip the execution of fully finished operators. This simplifies the implementation of the task side and scheduler. 

Changes to Public(Evolving) APIs

We lack a clear method in the API to stop processing records and flush any buffered records. We do have the StreamOperator#close  method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose  method. Lastly, the semantic of RichFunction#close  is different from StreamOperator#close . Having no direct access to any output the RichFunction#close  is responsible purely for releasing resources. We suggest using this opportunity to clean up the semi-public StreamOperator  API and:

  1. remove the dispose  method
  2. change the semantic of close  method
  3. introduce new finish  method

Effectively it would modify the Operator  lifecycle (Effectively it would modify the Operator  lifecycle (https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/task_lifecycle/#operator-lifecycle-in-a-nutshell) termination phase to be:

...

Code Block
@Public
public interface SinkFunction<IN> extends Function, Serializable {
    default void finish() throws Exception {}
}

Do Not Cancel Pending Checkpoint on Finish

Currently when a task finish, all the pending checkpoints would be canceled. Since now we would support checkpoints after tasks finished, tasks finish should not fail the checkpoints and we would change the behavior to waiting for all the pending checkpoints to finish before task get finished.

Failover and Recovery

Final Checkpoint / Savepoint

As shown in the above interface, to ensure the last piece of data gets committed, the task would need to wait for one more checkpoint. Without special treatment, the task would have to wait for the checkpoints one-by-one, which would be very inefficient. To cope with this issue, we would like to further change the process of the StreamTask: we would introduce a new event to notify the end of user records, then all the tasks could first call finish() on operators in a chain. Then each task could wait for its last checkpoint separately. Once the last checkpoint is notified, it could then emit EndOfPartition to cleanup the network resources as usual. 

We could also unify stop-with-savepoint into the same process. Currently we have three different savepoint types:

  1. Trigger savepoint without stopping job.
  2. stop-with-savepoint
  3. stop-with-savepoint --drain.

Savepoint without stopping the job is more like normal checkpoints and does not involve task finish, thus it is not considered in this FLIP. For stop-with-savepoint [–drain], currently it would first trigger a savepoint and hold all the tasks till the savepoint succeeded, then it finished all the source tasks to stop the job. This might have some issues in that there might be still some data between savepoint and finishing tasks. To cope with this issue, we could also unify the stop-with-savepoint [–drain] with the final checkpoint: when received the savepoint requirement from JM, the source tasks would first finishTask() and emit the event to notify the following tasks to call finish() on all the operators, then all the tasks head to wait for the savepoint with the specified id to finish. The new process avoids the data after savepoints and provides a unified treatment for all the scenarios related to committing the last piece of data. Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. However, for the first version, we would still restart all the tasks and skip the execution of fully finished operators. This simplifies the implementation of the task side and scheduler. 

Rejected Alternatives

CheckpointCoordinator Maintains the Finished Status of Tasks Separately

...