Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently when a task finish, all the pending checkpoints would be canceled. Since now we would support checkpoints after tasks finished, tasks finish should not fail the checkpoints and we would change the behavior to waiting for all the pending checkpoints to finish before task get finished.

Support Operators Waiting For Checkpoint Complete Before Finish

If some operators of a task have logic when notifying a checkpoint complete, the tasks would need to wait for a complete checkpoint before finished. For example, the Sink operator would need to wait for one complete checkpoint to commit all the pending transactions. 

To support these kind of operators, we could introduce a new interface to mark the operators need to wait for finalization right after endOfInput():

Code Block
languagejava
linenumberstrue
public interface FinalizationRequired {

	CompletableFuture<Void> getFinalizeFuture();

}

For such kind of operators, runtime would keep waiting till the finalize future is done. If the operator is running in the mail thread, the waiting is achieve by keeping checking whether the future is done and yield the mailbox thread, otherwise the runtime would block via future.get() (e.g., the operator is chained with a legacy source).

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. Specially, if all the source operators have been finished, the remaining operator would directly start the process of finishing. 

...