Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently the methods exposed by the RichFunction do not allow function implementers to differentiate between the logic to be executed during normal termination of a job (i.e. end of input stream, pipeline drain, and pipeline suspend) and termination due to a failure or user-triggered cancelation through the cancel command.

This proposal aims at solving this issue by introducing a new (optional) interface. This interface should be implemented by any UDF that wants to perform any special logic during graceful termination and its methods are integrated with Flink’s DRAIN and SUSPEND commands, described in FLIP-34

...

  • DRAIN (stop-with-savepoint with max wm): this is equivalent to sending the EOS and taking a final savepoint. So we want to:
    1. send the MAX_WATERMARK
    2. close any in-progress part file and get them ready to be committed, 
    3. take the savepoint the user requested. This will reflect that there are no in-progress part files as we are draining our pipeline.
    4. let the previously closed in-progress part files be committed as a result of a successful savepoint (as it is done currently).
    5. perform any necessary resource clean-up.
  • SUSPEND (stop-with-savepoint): this stops the job with a final savepoint without "draining" the pipeline, so no EOS or MAX_WATERMARK is sent. In this case we want to: 
    1.  take the savepoint the user requested as usual.
    2.  perform any necessary resource clean-up.
  • ERROR/CANCELLATION
    1. perform any necessary resource clean-up.

...

  1. Keep the RichFunction class (including the close() method) as is, but state explicitly that the close() is called in any case of termination, both graceful and due to an error. This is to say that the semantics of the close() are now aligned with those of the close() in Java's AutoCloseable interface, which we can consider making more explicit by making the RichFunction implement AutoCloseable. This change is not expected to break backwards compatibility, as close() is called in any case of termination already.
  2. Add a new interface for UDFs to implement if they want to execute application-specific logic in case of graceful job termination, i.e. batch jobs reaching EOS, DRAIN and SUSPEND of streaming jobs. This proposed interface can be named WithGracefulShutdown (a la trait) and include the following methods:

...

          - void shutdown() throws Exception


public interface WithGracefulShutdown {

    void prepareToShutdown() throws Exception;

    void shutdown() throws Exception;

}


These two methods are relevant ONLY in the case of graceful job termination and are NOT called in case of termination due to an error. In addition, the actions of these methods should be idempotent as it is not guaranteed that they will be called only once.

...

  • SUSPEND:
    1. snapshotState()
    2. notifyCheckpointComplete()
    3. close()
  • ERROR/CANCELLATION:
    1. close()

Backwards Compatibility

...