Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Event

Stream Task Status

Operator StatusFinal CheckpointStop with Savepoint with DrainStop with Savepoint

RUNNING


RUNNING---


---

---
Aligned on MAX_WATERMARKAdvanced To MAX_WATERMARK and trigger all the event timersAdvanced To MAX_WATERMARK and trigger all the event timersN/A (MAX_WATERMARK is not emitted in this case)

Emit MAX_WATERMARKEmit MAX_WATERMARKN/A
Aligned On EndOfUserRecordsEventWAITING_FOR_FINAL_CPFINISHED

call operator.endInput() & operator.finish()

call operator.endInput() & operator.finish()-

Emit EndOfUserRecordsEvent[finished = true]Emit EndOfUserRecordsEvent[finished = true]Emit EndOfUserRecordsEvent[finished = false]
Aligned on Checkpoint BarrierEmit CheckpointBarrierEmit CheckpointBarrierEmit CheckpointBarrier

Wait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint CompletedWait for Checkpoint / Savepoint Completed
Checkpoint CompletedWait for downstream tasks acknowledge EndOfUserRecordsEventWait for downstream tasks acknowledge EndOfUserRecordsEventWait for downstream tasks acknowledge EndOfUserRecordsEvent
Downstream Tasks acknowledge EndOfUserRecordsEventCLOSEDCLOSEDWait for EndOfPartitionEventWait for EndOfPartitionEventWait for EndOfPartitionEvent

Call operator.close()Call operator.close()Call operator.close()

Emit EndOfPartitionEventEmit EndOfPartitionEventEmit EndOfPartitionEvent


Info
We need to wait for a checkpoint to complete, that started after the finish() method. However, we support concurrent checkpoints. Moreover there is no guarantee the notifyCheckpointComplete arrives or the order in which they will arrive. It should be enough though to wait for notification for any checkpoint that started after finish().

We should make sure though later checkpoints do not leave behind lingering resources.

Imagine a scenario where:
1. task/operator received `finish()`
2. checkpoint 42 triggered (not yet completed)
3. checkpoint 43 triggered (not yet completed)
4. checkpoint 44 triggered (not yet completed)
5. notifyCheckpointComplete(43)


Our proposal is to shutdown the task immediately after seeing first `notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER `finish()`. This should be fine, as:
a) ideally there should be no new pending transactions opened after checkpoint 42
b) even if operator/function is opening some transactions for checkpoint 43 and checkpoint 44 (`FlinkKafkaProducer`), those transactions after checkpoint 42 should be empty

After seeing 5. (notifyCheckpointComplete(43)) It should be good enough to:
- commit transactions from checkpoint 42, (and 43 if they were created, depends on the user code)
- close operator, aborting any pending transactions (for checkpoint 44 if they were opened, depends on the user code)

If checkpoint 44 completes afterwards, it will still be valid. Ideally we would recommend that after seeing `finish()` operators/functions should not be opening any new transactions, but that shouldn't be required.

Skip Waiting for the Final Checkpoint If possible

...