Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. We would like to introduce a new event to notify all the user records are finished, and call finish() immediately after the new event is aligned for all the input channels. Since it shares the same semantics with the event EndOfUserRecordsEvent we introduced to solve the unaligned checkpoint issues in previous sections, we could reuse this event. 
  2. We would adjust the process of savepoint to be first finish the task and then trigger a savepoint so that they could be unified with the final checkpoint process and avoid confused records after savepoint. 

The detail life cycle of for the StreamTask source stream tasks and the operators would become 

...

Event

Stream Task Status

Operator StatusFinal CheckpointStop with Savepoint with DrainStop with Savepoint

RUNNING


RUNNING---

No more records

or

Received Savepoint Trigger

---

-finish task (cancel source thread for legacy source and suspend the mailbox for the new source)finish task (cancel source thread for legacy source and suspend the mailbox for the new source)

Advanced To MAX_WATERMARK and trigger all the event timersAdvanced To MAX_WATERMARK and trigger all the event timers-

Emit MAX_WATERMARKEmit MAX_WATERMARK-

WAITING_FOR_FINAL_CPFINISHED

call operator.endInput() & operator.finish()

call operator.endInput() & operator.finish()-


Emit EndOfUserRecordsEvent[finished = true]Emit EndOfUserRecordsEvent[finished = true]Emit EndOfUserRecordsEvent[finished = false]


when checkpoint triggered, emit Checkpoint BarrierEmit Checkpoint BarrierEmit Checkpoint Barrier


Wait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint Completed
Checkpoint Completed
Wait for downstream tasks acknowledge EndOfUserRecordsEventWait for downstream tasks acknowledge EndOfUserRecordsEventWait for downstream tasks acknowledge EndOfUserRecordsEvent
Downstream Tasks acknowledge EndOfUserRecordsEventCLOSEDCLOSED--

Call operator.close()Call operator.close()Call operator.close()

Emit EndOfPartitionEventEmit EndOfPartitionEventEmit EndOfPartitionEvent


Similarly, the status for the non-source tasks would become

Event

Stream Task Status

Operator StatusFinal CheckpointStop with Savepoint with DrainStop with Savepoint

RUNNING


RUNNING---


---

---
Aligned on MAX_WATERMARKAdvanced To MAX_WATERMARK and trigger all the event timersAdvanced To MAX_WATERMARK and trigger all the event timers-

Emit MAX_WATERMARKEmit MAX_WATERMARK-
Aligned On EndOfUserRecordsEventWAITING_FOR_FINAL_CPFINISHED

call operator.endInput() & operator.finish()

call operator.endInput() & operator.finish()-

Emit EndOfUserRecordsEvent[finished = true]Emit EndOfUserRecordsEvent[finished = true]Emit EndOfUserRecordsEvent[finished = false]
Aligned on Checkpoint BarrierEmit CheckpointBarrierEmit CheckpointBarrierEmit CheckpointBarrier

Wait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint Completed
Checkpoint CompletedWait for downstream tasks acknowledge EndOfUserRecordsEventWait for downstream tasks acknowledge EndOfUserRecordsEventWait for downstream tasks acknowledge EndOfUserRecordsEvent
Downstream Tasks acknowledge EndOfUserRecordsEventCLOSEDCLOSED--

Call operator.close()Call operator.close()Call operator.close()

Emit EndOfPartitionEventEmit EndOfPartitionEventEmit EndOfPartitionEvent

  

Rejected Alternatives

CheckpointCoordinator Maintains the Finished Status of Tasks Separately

...