Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

...

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...

thread/tzhjm0ttpkrc1yd3vo4p72ppkrwgp0sw
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-13103

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Currently the methods exposed by the RichFunction do not allow function implementers to differentiate between the logic to be executed during normal termination of a job (i.e. end of input stream, pipeline drain, and pipeline suspend) and termination due to a failure or user-triggered cancelation through the cancel command.

This proposal aims at solving this issue by introducing a new (optional) interface. This interface should be implemented by any UDF that wants to perform any special logic during graceful termination and its methods are integrated with Flink’s DRAIN and SUSPEND commands, described in FLIP-34

...

  • FINITE SOURCE:
    1. close any in-progress part file and get them ready to be committed,
    2. commit them,
    3. perform any necessary resource clean-up.

ASSUMPTION: The assumption here is that batch does not have checkpointing enabled thus "re-using" stop-with-savepoint is not an option. As a fault-tolerance mechanism in batch we assume that the results of shuffles are materialized and stored persistently, and in the case of a failure, the job resume from the last successfully persisted shuffle result.


  • DRAIN (stop-with-savepoint with max wm): this is equivalent to sending the EOS and taking a final savepoint. So we want to:
    1. send the MAX_WATERMARK
    2. close any in-progress part file and get them ready to be committed, 
    3. take the savepoint the user requested. This will reflect that there are no in-progress part files as we are draining our pipeline.
    4. let the previously closed in-progress part files be committed as a result of a successful savepoint (as it is done currently).
    5. perform any necessary resource clean-up.
  • SUSPEND (stop-with-savepoint): this stops the job with a final savepoint without "draining" the pipeline, so no EOS or MAX_WATERMARK is sent. In this case we want to: 
    1.  take the savepoint the user requested as usual.
    2.  perform any necessary resource clean-up.
  • ERROR/CANCELLATION
    1. perform any necessary resource clean-up.

...

  1. Keep the RichFunction class (including the close() method) as is, but state explicitly that the close() is called in any case of termination, both graceful and due to an error. This is to say that the semantics of the close() are now aligned with those of the close() in Java's AutoCloseable interface, which we can consider making more explicit by making the RichFunction implement AutoCloseable. This change is not expected to break backwards compatibility, as close() is called in any case of termination already.
  2. Add a new interface for UDFs to implement if they want to execute application-specific logic in case of graceful job termination, i.e. batch jobs reaching EOS, DRAIN and SUSPEND of streaming jobs. This proposed interface can be named WithGracefulShutdown (a la trait) and include the following methods:

...

          - void shutdown() throws Exception


public interface WithGracefulShutdown {

    void prepareToShutdown() throws Exception;

    void shutdown() throws Exception;

}


These two methods are relevant ONLY in the case of graceful job termination and are NOT called in case of termination due to an error. In addition, the actions of these methods should be idempotent as it is not guaranteed that they will be called only once.

...

  • SUSPEND:
    1. snapshotState()
    2. notifyCheckpointComplete()
    3. close()
  • ERROR/CANCELLATION:
    1. close()

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

Backwards Compatibility

There is no risk of breaking compatibility as the RichFunction interface stays as it was, with no change in the semantics of the methods. This FLIP simply adds a new, optional interface for UDFs to implement. 

Rejected Alternatives

Adding the methods of the proposed interface directly to the RichFunction interface.

This solution was rejected because:

  • the methods in the RichFunction interface are methods relevant to most functions while prepareToShutdown() and shutdown() are relevant to functions with specific requirements.
  • if something changes in the future and the interface needs to change, we will only need to deprecate the interface and not methods in the RichFunction, which will be visible to all users.