Status

Discussion threadhttps://lists.apache.org/thread/tzhjm0ttpkrc1yd3vo4p72ppkrwgp0sw
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Introduction

Currently the methods exposed by the RichFunction do not allow function implementers to differentiate between the logic to be executed during normal termination of a job (i.e. end of input stream, pipeline drain, and pipeline suspend) and termination due to a failure or user-triggered cancelation through the cancel command.

This proposal aims at solving this issue by introducing a new (optional) interface. This interface should be implemented by any UDF that wants to perform any special logic during graceful termination and its methods are integrated with Flink’s DRAIN and SUSPEND commands, described in FLIP-34

Motivation

In many cases, UDFs (User Defined Functions) need to be able to perform application-specific actions when they stop in an orderly manner, in addition to the required resource clean-up (e.g. closing connections, etc). To clarify, stopping in an orderly manner includes both the case of a batch job where the job reaches the end of its input stream (EOS from now on), and when the user chooses to stop the pipeline through the stop-with-savepoint command (e.g. SUSPEND, DRAIN).

In addition, these actions should be performed only as a result of a graceful shutdown, and not in the case of termination due to an error. In the latter case, the UDF should simply perform the necessary resource clean-up.

Current Situation

Currently, Flink's UDFs, and more specifically the RichFunction, which exposes methods related to a UDF’s lifecycle, only have a close() method which is called in any case of job termination. This includes any form of orderly termination and termination due to an error. This is insufficient as each of the aforementioned termination contexts requires special handling. 

For an illustrative example, we can use the StreamingFileSink which guarantees exactly-once end-to-end semantics when writing to the filesystem by "parking" data in temporary "part-files" between checkpoints, and "committing" these files, or making them visible to the outside world, upon successful checkpoint completion. This guarantees that even in the case of failure, the "committed" files will never be invalidated, as their input data are part of the last successful checkpoint and thus will not be reprocessed. 

A job that uses the StreamingFileSink would want the sink to perform the following actions in each of the cases of job termination:

  • FINITE SOURCE:
    1. close any in-progress part file and get them ready to be committed,
    2. commit them,
    3. perform any necessary resource clean-up.

ASSUMPTION: The assumption here is that batch does not have checkpointing enabled thus "re-using" stop-with-savepoint is not an option. As a fault-tolerance mechanism in batch we assume that the results of shuffles are materialized and stored persistently, and in the case of a failure, the job resume from the last successfully persisted shuffle result.


  • DRAIN (stop-with-savepoint with max wm): this is equivalent to sending the EOS and taking a final savepoint. So we want to:
    1. send the MAX_WATERMARK
    2. close any in-progress part file and get them ready to be committed, 
    3. take the savepoint the user requested. This will reflect that there are no in-progress part files as we are draining our pipeline.
    4. let the previously closed in-progress part files be committed as a result of a successful savepoint (as it is done currently).
    5. perform any necessary resource clean-up.
  • SUSPEND (stop-with-savepoint): this stops the job with a final savepoint without "draining" the pipeline, so no EOS or MAX_WATERMARK is sent. In this case we want to: 
    1.  take the savepoint the user requested as usual.
    2.  perform any necessary resource clean-up.
  • ERROR/CANCELLATION: 
    1. perform any necessary resource clean-up.

Given that currently UDFs only have the close() method which is called in both normal and abrupt termination, the logic implemented by the method only performs the clean-up and closes any open file streams.

Proposed Solution


  1. Keep the RichFunction class (including the close() method) as is, but state explicitly that the close() is called in any case of termination, both graceful and due to an error. This is to say that the semantics of the close() are now aligned with those of the close() in Java's AutoCloseable interface, which we can consider making more explicit by making the RichFunction implement AutoCloseable. This change is not expected to break backwards compatibility, as close() is called in any case of termination already.
  2. Add a new interface for UDFs to implement if they want to execute application-specific logic in case of graceful job termination, i.e. batch jobs reaching EOS, DRAIN and SUSPEND of streaming jobs. This proposed interface can be named WithGracefulShutdown (a la trait) and include the following methods:

          - void prepareToShutdown() throws Exception

          - void shutdown() throws Exception


public interface WithGracefulShutdown {

    void prepareToShutdown() throws Exception;

    void shutdown() throws Exception;

}


These two methods are relevant ONLY in the case of graceful job termination and are NOT called in case of termination due to an error. In addition, the actions of these methods should be idempotent as it is not guaranteed that they will be called only once.

The need for 2 methods can be illustrated by the sequence of required events in the case of DRAIN, as described in the example in the "CURRENT SITUATION" section. There we see that we want to perform an action (step ii) before taking the savepoint (step iii), so that the state in the savepoint reflects that action, and then, after step iii, commit the files, i.e. finalize our shutdown (e.g. commit any side-effects).

To illustrate when these methods are called in a more general way, in the remainder of the section we present the sequence of method calls in each of the termination cases separately:

  • FINITE SOURCE (no checkpoints, see ASSUMPTION):
    1. prepareToShutdown()
    2. shutdown()
    3. close()
  • DRAIN:
    1. max-watermark
    2. prepareToShutdown()
    3. snapshotState()
    4. notifyCheckpointComplete()
    5. shutdown()
    6. close()
  • SUSPEND:
    1. snapshotState()
    2. notifyCheckpointComplete()
    3. close()
  • ERROR/CANCELLATION:
    1. close()

Backwards Compatibility

There is no risk of breaking compatibility as the RichFunction interface stays as it was, with no change in the semantics of the methods. This FLIP simply adds a new, optional interface for UDFs to implement. 

Rejected Alternatives

Adding the methods of the proposed interface directly to the RichFunction interface.

This solution was rejected because:

  • the methods in the RichFunction interface are methods relevant to most functions while prepareToShutdown() and shutdown() are relevant to functions with specific requirements.
  • if something changes in the future and the interface needs to change, we will only need to deprecate the interface and not methods in the RichFunction, which will be visible to all users.