Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Therefore, we need to also support checkpoints even after some tasks are finished. 

Overall Design

There are multiple options to achieve this target. In this section we compare different options.

Option 1. Prevent tasks from finishing

The first option is to prevent the tasks from finishing until the whole job is finished. Specially, tasks would never finish if the job also has unbounded sources. This option is not preferred due to that

  1. it causes zombie tasks and waste resources. 
  2. it also requires a new “pre-EndOfPartition” event to indicate all the records are sent. Otherwise if it directly sent EndOfPartition before tasks finished, the communication channel would be destroyed and it would also be weird to have checkpoint barriers after EndOfPartition. However, introducing the “pre-EndOfPartition” event would largely complicate the termination process. 

Option 2. Allow tasks to finish & Checkpoints contain the final states from finished tasks

Another option is allowing tasks to finish normally and checkpoints after tasks finished would only take snapshots for the running tasks. A core issue of this option is whether we need to include the final states collected when a tasks finish in the following checkpoints. Currently when failover happens after some tasks are finished, the job will fallback to a checkpoint taken when all the tasks are running. Including the final states of the finished tasks ensures the behavior unchanged compared with the current one since the finished tasks could be viewed as still running. However it also introduce some problems:

Based on such an ability, the operators writing data to the external systems in 2pc style would be able to wait for one more checkpoint after tasks emitted all the records to commit the last piece of data. This is similar to the job terminated with stop-with-savepoint --drain, thus we would also like to unify the two processes so that the last piece of data could be committed in all the required cases. To achieve this target efficiently we would also like to adjust the operator API and the life cycle of the StreamTask. 

Overall Design

There are multiple options to achieve this target. In this section we compare different options.

Option 1. Prevent tasks from finishing

The first option is to prevent the tasks from finishing until the whole job is finished. Specially, tasks would never finish if the job also has unbounded sources. This option is not preferred due to that

  1. it causes zombie tasks and waste resources. 
  2. it also requires a new “pre-EndOfPartition” event to indicate all the records are sent. Otherwise if it directly sent EndOfPartition before tasks finished, the communication channel would be destroyed and it would also be weird to have checkpoint barriers after EndOfPartition. However, introducing the “pre-EndOfPartition” event would largely complicate the termination process. 

Option 2. Allow tasks to finish & Checkpoints contain the final states from finished tasks

Another option is allowing tasks to finish normally and checkpoints after tasks finished would only take snapshots for the running tasks. A core issue of this option is whether we need to include the final states collected when a tasks finish in the following checkpoints. Currently when failover happens after some tasks are finished, the job will fallback to a checkpoint taken when all the tasks are running. Including the final states of the finished tasks ensures the behavior unchanged compared with the current one since the finished tasks could be viewed as still running. However it also introduce some problems:

  1. Including the states from finished tasks in all the following checkpoints requires the states get managed in the master side, which causes additional overhead.
  2. Since the same states from the finished tasks would be used in multiple checkpoints, we need to introduce the
  3. Including the states from finished tasks in all the following checkpoints requires the states get managed in the master side, which causes additional overhead.
  4. Since the same states from the finished tasks would be used in multiple checkpoints, we need to introduce the reference count between checkpoints and snapshots. This complicates the checkpoint management, especially after we already have the reference count between snapshots and state items due to incremental checkpoints.
  5. Including the states from the finished tasks implies that even if all the tasks of an operator have finished, we still need to restart the tasks of this operators after failover. For the long run, it limits the possibility that we only resume the execution for the operators not fully finished. 

...

This is enough for aligned checkpoints, but unaligned checkpoints would introduce additional complexity. Since unaligned checkpoint barriers could jump over the pending records, if we instead wait for the EndOfPartition directly, since EndOfPartition could not jump over, the CheckpointCoordinator could not get notified in time and we might incur longer checkpoint periods during the finishing phase. This is similar for the aligned checkpoints with timeout. To cope with this issue, the upstream tasks would wait till the downstream tasks to process all the pending records before exit. The upstream tasks would emit a special event, namely EndOfUserRecordsEvent, after all the records and the downstream tasks would respond with another special event, and the upstream tasks only exit after all the response events are received. During this period, the unaligned checkpoints or checkpoints with timeout could be done normally. Afterwards the EndOfPartition could reach the downstream CheckpointAligner quickly since there are no pending records.

...

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. However, for the first version, we would still restart all the tasks and skip the execution of fully finished operators. This simplifies the implementation of the task side and scheduler. 

Changes

...

the Operator APIs Related to Finish

Based on the ability to do checkpoint or savepoint after tasks finished, operators interacting with external systems in 2PC style could commit the last piece of data by waiting for one more checkpoint or savepoint. In general, for jobs to finish there are three cases:

  1. All records are processed and the job finished normally.
  2. The job is finished due to stop-with-savepoint --drain.
  3. The job is finished due to stop-with-savepoint.

For the first two cases, the job would not be expected to be restarted, thus we should flush all the records and commit all the records. But for the third case, the job is expected to be re-started from the savepoint, thus in this case we should not flush all the records. For example, for window operators using event-time triggers, it would be reasonable to trigger all the remaining timers if it is going to finish, but for the third case, the job would be restarted and process more records, triggering all the timers would cause wrong result after restarting. 

To flush all the records on finished, we need to 

  1. Emit a special MAX_WATERMARK to trigger all the event time timers.
  2. Call the dedicated operator API to flush the other buffered records. 

However, currently we lack a clear method in the API to stop processing records and flush buffered records. We lack a clear method in the API to stop processing records and flush any buffered records. We do have the StreamOperator#close  method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose  method. Lastly, the semantic of RichFunction#close  is different from StreamOperator#close . Having no direct access to any output the RichFunction#close  is responsible purely for releasing resources. We suggest using this opportunity to clean up the semi-public StreamOperator  API and:

  1. remove the dispose  method
  2. change the semantic of close  method
  3. introduce new finish  method

Effectively it would modify the Operator  lifecycle (https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/task_lifecycle/#operator-lifecycle-in-a-nutshell) termination phase to be:

Code Block
    // termination phase
    OPERATOR::endOfInput(1)
    ...
    OPERATOR::endOfInput(n)

    OPERATOR::finish
        UDF(if available)::finish

    OPERATOR::snapshotState()
    OPERATOR::notifyCheckpointComplete()

    OPERATOR::close() --> call UDF's close method
        UDF::close()

In case of failure, we will call the Operator#close → UDF#close method.


Code Block
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {

    ....

    /**
     * This method is called at the end of data processing.
     *
     * <p>The method is expected to flush all remaining buffered data. Exceptions during this
     * flushing of buffered should be propagated, in order to cause the operation to be recognized
     * as failed, because the last data items are not processed properly.
     *
     * <p><b>After this method is called, no more records can be produced for the downstream operators.</b>
     *
     * <p><b>NOTE:</b>This method does not need to close any resources. You should release external
     * resources in the {@link #close()} method.
     *
     * @throws java.lang.Exception An exception in this method causes the operator to fail.
     */
    void finish() throws Exception;

    /**
     * This method is called at the very end of the operator's life, both in the case of a
     * successful completion of the operation, and in the case of a failure and canceling.
     *
     * <p>This method is expected to make a thorough effort to release all resources that the
     * operator has acquired.
     *
     * <p><b>NOTE:</b>It should not emit any records! If you need to emit records at the end of
     * processing, do so in the {@link #finish()} method.
     */
    void close() throws Exception;

    ...

}

The UDF that most often buffers data and thus requires a flushing/finishing phase is the SinkFunction where it could e.g. create transactions in an external system that can be committed during the final snapshot. Therefore we suggest introducing a finish  method in the SinkFunction :

Code Block
@Public
public interface SinkFunction<IN> extends Function, Serializable {
    default void finish() throws Exception {}
}

Currently there would be some duplication between endOfInput() and finish() for one input operators, for the long run we would like to unify them to finish() method and keep only endOfInput(int channelIndex) and finish(). 

Final Checkpoint and Stopping Job With Savepoint

If we directly map the original close() to the new finish() method, based on the current StreamTask's implementation, it would call the finish() method after received EndOfPartitionEvent and emit EndOfPartitionEvent after the last checkpoint / savepoint is completed. This would cause the tasks to wait for the last checkpoint / savepoint one-by-one, which is inefficient. 

Besides, currently stop-with-savepoint [--drain] uses a different process: it first triggers a savepoint and blocks the tasks till the savepoint is completed, then it finishes the tasks. There would cause duplication if we want to also flush the records for stop-with-savepoint --drain. Besides, currently there might still be records between the savepoint and finishTask, this would cause confusion, especially if these records affect external systems. 

To deal with the above issues, we would like to adjust the lifecycle of the StreamTasks to avoid chained waiting and diverged processes:

  1. We would like to introduce a new event to notify all the user records are finished, and call finish() immediately after the new event is aligned for all the input channels. Since it shares the same semantics with the event EndOfUserRecordsEvent we introduced to solve the unaligned checkpoint issues in previous sections, we could reuse this event. 
  2. We would adjust the process of savepoint to first finish the task and then trigger a savepoint so that they could be unified with the final checkpoint process and avoid confused records after savepoint. 

The detail life cycle for the source stream tasks and the operators would become 

Event

Stream Task Status

Operator StatusFinal CheckpointStop with Savepoint with DrainStop with Savepoint

RUNNING


RUNNING---

No more records

or

Received Savepoint Trigger

---

-finish task (cancel source thread for legacy source and suspend the mailbox for the new source)finish task (cancel source thread for legacy source and suspend the mailbox for the new source)

Advanced To MAX_WATERMARK and trigger all the event timersAdvanced To MAX_WATERMARK and trigger all the event timers-

Emit MAX_WATERMARKEmit MAX_WATERMARK-

WAITING_FOR_FINAL_CPFINISHED

call operator.endInput() & operator.finish()

call operator.endInput() & operator.finish()-


Emit EndOfUserRecordsEvent[finished = true]Emit EndOfUserRecordsEvent[finished = true]Emit EndOfUserRecordsEvent[finished = false]


when checkpoint triggered, emit Checkpoint BarrierEmit Checkpoint BarrierEmit Checkpoint Barrier


Wait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint Completed
Checkpoint Completed
Wait for downstream tasks acknowledge EndOfUserRecordsEventWait for downstream tasks acknowledge EndOfUserRecordsEventWait for downstream tasks acknowledge EndOfUserRecordsEvent
Downstream Tasks acknowledge EndOfUserRecordsEventCLOSEDCLOSED--

Call operator.close()Call operator.close()Call operator.close()

Emit EndOfPartitionEventEmit EndOfPartitionEventEmit EndOfPartitionEvent


Similarly, the status for the non-source tasks would become

Event

Stream Task Status

Operator StatusFinal CheckpointStop with Savepoint with DrainStop with Savepoint

RUNNING


RUNNING---


---

---
Aligned on MAX_WATERMARKAdvanced To MAX_WATERMARK and trigger all the event timersAdvanced To MAX_WATERMARK and trigger all the event timers-

Emit MAX_WATERMARKEmit MAX_WATERMARK-
Aligned On EndOfUserRecordsEventWAITING_FOR_FINAL_CPFINISHED

call operator.endInput() & operator.finish()

call operator.endInput() & operator.finish()-

Emit EndOfUserRecordsEvent[finished = true]Emit EndOfUserRecordsEvent[finished = true]Emit EndOfUserRecordsEvent[finished = false]
Aligned on Checkpoint BarrierEmit CheckpointBarrierEmit CheckpointBarrierEmit CheckpointBarrier

Wait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint Completed
Checkpoint CompletedWait for downstream tasks acknowledge EndOfUserRecordsEventWait for downstream tasks acknowledge EndOfUserRecordsEventWait for downstream tasks acknowledge EndOfUserRecordsEvent
Downstream Tasks acknowledge EndOfUserRecordsEventCLOSEDCLOSED--

Call operator.close()Call operator.close()Call operator.close()

Emit EndOfPartitionEventEmit EndOfPartitionEventEmit EndOfPartitionEvent

  

...

We lack a clear method in the API to stop processing records and flush any buffered records. We do have the StreamOperator#close  method which is supposed to flush all records, but at the same time, currently, it closes all resources, including connections to external systems. We need separate methods for flushing and closing resources because we might need the connections when performing the final checkpoint, once all records are flushed. Moreover, the logic for closing resources is duplicated in the StreamOperator#dispose  method. Lastly, the semantic of RichFunction#close  is different from StreamOperator#close . Having no direct access to any output the RichFunction#close  is responsible purely for releasing resources. We suggest using this opportunity to clean up the semi-public StreamOperator  API and:

  1. remove the dispose  method
  2. change the semantic of close  method
  3. introduce new finish  method

Effectively it would modify the Operator  lifecycle (https://ci.apache.org/projects/flink/flink-docs-master/docs/internals/task_lifecycle/#operator-lifecycle-in-a-nutshell) termination phase to be:

Code Block
    // termination phase
    OPERATOR::endOfInput(1)
    ...
    OPERATOR::endOfInput(n)

    OPERATOR::finish
        UDF(if available)::finish

    OPERATOR::snapshotState()
    OPERATOR::notifyCheckpointComplete()

    OPERATOR::close() --> call UDF's close method
        UDF::close()

In case of failure, we will call the Operator#close → UDF#close method.

Code Block
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {

    ....

    /**
     * This method is called at the end of data processing.
     *
     * <p>The method is expected to flush all remaining buffered data. Exceptions during this
     * flushing of buffered should be propagated, in order to cause the operation to be recognized
     * as failed, because the last data items are not processed properly.
     *
     * <p><b>After this method is called, no more records can be produced for the downstream operators.</b>
     *
     * <p><b>NOTE:</b>This method does not need to close any resources. You should release external
     * resources in the {@link #close()} method.
     *
     * @throws java.lang.Exception An exception in this method causes the operator to fail.
     */
    void finish() throws Exception;

    /**
     * This method is called at the very end of the operator's life, both in the case of a
     * successful completion of the operation, and in the case of a failure and canceling.
     *
     * <p>This method is expected to make a thorough effort to release all resources that the
     * operator has acquired.
     *
     * <p><b>NOTE:</b>It should not emit any records! If you need to emit records at the end of
     * processing, do so in the {@link #finish()} method.
     */
    void close() throws Exception;

    ...

}

The UDF that most often buffers data and thus requires a flushing/finishing phase is the SinkFunction where it could e.g. create transactions in an external system that can be committed during the final snapshot. Therefore we suggest introducing a finish  method in the SinkFunction :

Code Block
@Public
public interface SinkFunction<IN> extends Function, Serializable {
    default void finish() throws Exception {}
}

Final Checkpoint / Savepoint

As shown in the above interface, to ensure the last piece of data gets committed, the task would need to wait for one more checkpoint. Without special treatment, the task would have to wait for the checkpoints one-by-one, which would be very inefficient. To cope with this issue, we would like to further change the process of the StreamTask: we would introduce a new event to notify the end of user records, then all the tasks could first call finish() on operators in a chain. Then each task could wait for its last checkpoint separately. Once the last checkpoint is notified, it could then emit EndOfPartition to cleanup the network resources as usual. 

We could also unify stop-with-savepoint into the same process. Currently we have three different savepoint types:

  1. Trigger savepoint without stopping job.
  2. stop-with-savepoint
  3. stop-with-savepoint --drain.

Savepoint without stopping the job is more like normal checkpoints and does not involve task finish, thus it is not considered in this FLIP. For stop-with-savepoint [–drain], currently it would first trigger a savepoint and hold all the tasks till the savepoint succeeded, then it finished all the source tasks to stop the job. This might have some issues in that there might be still some data between savepoint and finishing tasks. To cope with this issue, we could also unify the stop-with-savepoint [–drain] with the final checkpoint: when received the savepoint requirement from JM, the source tasks would first finishTask() and emit the event to notify the following tasks to call finish() on all the operators, then all the tasks head to wait for the savepoint with the specified id to finish. The new process avoids the data after savepoints and provides a unified treatment for all the scenarios related to committing the last piece of data.

Rejected Alternatives

CheckpointCoordinator Maintains the Finished Status of Tasks Separately

...