Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • FINITE SOURCE:
    1. close any in-progress part file and get them ready to be committed,
    2. commit them,
    3. perform any necessary resource clean-up.

ASSUMPTION: The assumption here is that batch does not have checkpointing enabled thus "re-using" stop-with-savepoint is not an option. As a fault-tolerance mechanism in batch we assume that the results of shuffles are materialized and stored persistently, and in the case of a failure, the job resume from the last successfully persisted shuffle result.


  • DRAIN (stop-with-savepoint with max wm): this is equivalent to sending the EOS and taking a final savepoint. So we want to:
    1. send the MAX_WATERMARK
    2. close any in-progress part file and get them ready to be committed, 
    3. take the savepoint the user requested. This will reflect that there are no in-progress part files as we are draining our pipeline.
    4. let the previously closed in-progress part files be committed as a result of a successful savepoint (as it is done currently).
    5. perform any necessary resource clean-up.
  • SUSPEND (stop-with-savepoint): this stops the job with a final savepoint without "draining" the pipeline, so no EOS or MAX_WATERMARK is sent. In this case we want to: 
    1.  take the savepoint the user requested as usual.
    2.  perform any necessary resource clean-up.

...

          - void shutdown() throws Exception


public interface WithGracefulShutdown {

    void prepareToShutdown() throws Exception;

    void shutdown() throws Exception;

}


These two methods are relevant ONLY in the case of graceful job termination and are NOT called in case of termination due to an error. In addition, the actions of these methods should be idempotent as it is not guaranteed that they will be called only once.

...

  • SUSPEND:
    1. snapshotState()
    2. notifyCheckpointComplete()
    3. close()
  • ERROR:
    1. close()

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

Backwards Compatibility

There is no risk of breaking compatibility as the RichFunction interface stays as it was, with no change in the semantics of the methods. This FLIP simply adds a new, optional interface for UDFs to implement. 

Rejected Alternatives

Adding the methods of the proposed interface directly to the RichFunction interface.

This solution was rejected because:

  • the methods in the RichFunction interface are methods relevant to most functions while prepareToShutdown() and shutdown() are relevant to functions with specific requirements.
  • if something changes in the future and the interface needs to change, we will only need to deprecate the interface and not methods in the RichFunction, which will be visible to all users.