Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Event

Stream Task Status

Operator StatusFinal CheckpointStop with Savepoint with DrainStop with Savepoint

RUNNING


RUNNING---


---

---
Aligned on MAX_WATERMARKAdvanced To MAX_WATERMARK and trigger all the event timersAdvanced To MAX_WATERMARK and trigger all the event timers-

Emit MAX_WATERMARKEmit MAX_WATERMARK-
Aligned On EndOfUserRecordsEventWAITING_FOR_FINAL_CPFINISHED

call operator.endInput() & operator.finish()

call operator.endInput() & operator.finish()-

Emit EndOfUserRecordsEvent[finished = true]Emit EndOfUserRecordsEvent[finished = true]Emit EndOfUserRecordsEvent[finished = false]
Aligned on Checkpoint BarrierEmit CheckpointBarrierEmit CheckpointBarrierEmit CheckpointBarrier

Wait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint Completed
Checkpoint CompletedWait for downstream tasks acknowledge EndOfUserRecordsEventWait for downstream tasks acknowledge EndOfUserRecordsEventWait for downstream tasks acknowledge EndOfUserRecordsEvent
Downstream Tasks acknowledge EndOfUserRecordsEventCLOSEDCLOSED--

Call operator.close()Call operator.close()Call operator.close()

Emit EndOfPartitionEventEmit EndOfPartitionEventEmit EndOfPartitionEvent

...

Skip Waiting for the Final Checkpoint If possible

For the final checkpoint case, there is one possibility that users in fact do not use the 2pc, thus for the source finished case the following task do not need to wait for the final checkpoint. As an optimization, to allow users specify such logic, we could enhance the StreamOperator and CheckpointListener interface with the following changes:


Code Block
languagejava
interface StreamOperator {
    default boolean requiresFinalCheckpoint() {
        return true;
    }
}

interface CheckpointListener {

    default boolean requiresFinalCheckpoint() {
        return true;
    }
}

class AbstractUdfStreamOperator {
    
    @Override
    boolean requiresFinalCheckpoint() {
        return userFunction instanceof CheckpointListener &&
            ((CheckpointListener) userFunction).requiresFinalCheckpoint();
    }
}

Rejected Alternatives

CheckpointCoordinator Maintains the Finished Status of Tasks Separately

...