THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Event | Stream Task Status | Operator Status | Final Checkpoint | Stop with Savepoint with Drain | Stop with Savepoint |
---|---|---|---|---|---|
RUNNING | RUNNING | - | - | - | |
- | - | - | |||
- | - | - | |||
Aligned on MAX_WATERMARK | Advanced To MAX_WATERMARK and trigger all the event timers | Advanced To MAX_WATERMARK and trigger all the event timers | - | ||
Emit MAX_WATERMARK | Emit MAX_WATERMARK | - | |||
Aligned On EndOfUserRecordsEvent | WAITING_FOR_FINAL_CP | FINISHED | call operator.endInput() & operator.finish() | call operator.endInput() & operator.finish() | - |
Emit EndOfUserRecordsEvent[finished = true] | Emit EndOfUserRecordsEvent[finished = true] | Emit EndOfUserRecordsEvent[finished = false] | |||
Aligned on Checkpoint Barrier | Emit CheckpointBarrier | Emit CheckpointBarrier | Emit CheckpointBarrier | ||
Wait for Checkpoint / Savepoint Completed | Wait for Checkpoint / Savepoint Completed | Wait for Checkpoint / Savepoint Completed | |||
Checkpoint Completed | Wait for downstream tasks acknowledge EndOfUserRecordsEvent | Wait for downstream tasks acknowledge EndOfUserRecordsEvent | Wait for downstream tasks acknowledge EndOfUserRecordsEvent | ||
Downstream Tasks acknowledge EndOfUserRecordsEvent | CLOSED | CLOSED | - | - | |
Call operator.close() | Call operator.close() | Call operator.close() | |||
Emit EndOfPartitionEvent | Emit EndOfPartitionEvent | Emit EndOfPartitionEvent |
...
Skip Waiting for the Final Checkpoint If possible
For the final checkpoint case, there is one possibility that users in fact do not use the 2pc, thus for the source finished case the following task do not need to wait for the final checkpoint. As an optimization, to allow users specify such logic, we could enhance the StreamOperator and CheckpointListener interface with the following changes:
Code Block | ||
---|---|---|
| ||
interface StreamOperator {
default boolean requiresFinalCheckpoint() {
return true;
}
}
interface CheckpointListener {
default boolean requiresFinalCheckpoint() {
return true;
}
}
class AbstractUdfStreamOperator {
@Override
boolean requiresFinalCheckpoint() {
return userFunction instanceof CheckpointListener &&
((CheckpointListener) userFunction).requiresFinalCheckpoint();
}
} |
Rejected Alternatives
CheckpointCoordinator Maintains the Finished Status of Tasks Separately
...