Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Info
We need to wait for a checkpoint to complete, that started after the finish() method. However, we support concurrent checkpoints. Moreover there is no guarantee the notifyCheckpointComplete arrives or the order in which they will arrive. It should be enough though to wait for notification for any checkpoint that started after finish().

We should make sure though later checkpoints do not leave behind lingering resources.

Imagine a scenario where:
1. task/operator received `finish()`
2. checkpoint 42 triggered (not yet completed)
3. checkpoint 43 triggered (not yet completed)
4. checkpoint 44 triggered (not yet completed)
5. notifyCheckpointComplete(43)


Our proposal is to shutdown the task immediately after seeing first `notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER `finish()`. This should be fine, as:
a) ideally there should be no new pending transactions opened after checkpoint 42
b) even if operator/function is opening some transactions for checkpoint 43 and checkpoint 44 (`FlinkKafkaProducer`), those transactions after checkpoint 42 should be empty

After seeing 5. (notifyCheckpointComplete(43)) It should be good enough to:
- commit transactions from checkpoint 42, (and 43 if they were created, depends on the user code)
- close operator, aborting any pending transactions (for checkpoint 44 if they were opened, depends on the user code)

If checkpoint 44 completes afterwards, it will still be valid. Ideally we would recommend that after seeing `finish()` operators/functions should not be opening any new transactions, but that shouldn't be required.

Skip Waiting for the Final Checkpoint If possible

For the final checkpoint case, there is one possibility that users in fact do not use the 2pc, thus for the source finished case the following task do not need to wait for the final checkpoint. As an optimization, to allow users specify such logic, we could enhance the StreamOperator and CheckpointListener interface with the following changes:

Code Block
languagejava
interface StreamOperator {
    default boolean requiresFinalCheckpoint() {
        return true;
    }
}

interface CheckpointListener {

    default boolean requiresFinalCheckpoint() {
        return true;
    }
}

class AbstractUdfStreamOperator {
    
    @Override
    boolean requiresFinalCheckpoint() {
        return userFunction instanceof CheckpointListener &&
            ((CheckpointListener) userFunction).requiresFinalCheckpoint();
    }
}


Rejected Alternatives

CheckpointCoordinator Maintains the Finished Status of Tasks Separately

...