This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In many scenarios, we want to finish a job with a savepoint. This can be, for example, because we want to upgrade our cluster, or because we finished processing the important part of our data and we want to simply terminate the job, but make sure that our output is safely stored to our sink storage system, e.g. using the StreamingFileSink.
...
This document targets fixing the end-to-end semantics when stopping jobs and proposes the addition of two shutting down modes which ensure that no events come after the checkpoint. In the remainder of the document we clarify the semantics of the two shutdown methods and discuss the implementation of the feature.
Goals
We propose the addition of the following two shutdown modes, namely:
...
The remainder of the paragraph presents the similarities and differences of the two modes.
SIMILARITIES
Both modes shut down with a checkpoint/savepoint and ensure all side effects go through. This will guarantee exactly-once semantics end-to-end. In addition, both modes ensure that no events come after the checkpoint. This will allow for better guarantees of at-least once sinks. Finally, in both cases, the JOB STATE should be FINISHED to indicate that the operation that the user requested completed successfully. This differs from how the current “cancel with savepoint” is implemented, which ends the Job in state CANCELED. Having the job state being CANCELED would make it impossible for the user to differentiate between success and failure of the requested operation, i.e. it can happen that someone cancels the job while a TERMINATE or SUSPEND is happening.
DIFFERENCES
Event Time Semantics
In SUSPEND, the user just wants to “pause” the job rather than stop it. For a job with event-time timers, this would imply that we do not want the registered (event-time) timers to fire, as the job will resume in the future. In this case, we should not send MAX_WATERMARK (for event-time).
...
In TERMINATE, we want the timers to fire, as this will flush out any buffered state associated with timers, e.g. non-fired windows. In this case, we should emit MAX_WATERMARK for event-time timers. For processing time timers, there is not much we can do for now.
System Perspective
The following table illustrates the differences between the two new operations from an operator/task perspective.
...
Source OPS | Task Status | Job Status | |
SUSPEND | Checkpoint Barrier, End Of Stream | Finished | Finished |
TERMINATE | MAX_WATERMARK, Checkpoint Barrier, End Of Stream | Finished | Finished |
User-Facing Changes
Expose the new SUSPEND and TERMINATE operations through REST to the user
Change event time semantics in accordance with the above table
Open questions
Any better suggestions about the names of TERMINATE and SUSPEND
Failure in the TERMINATE case:
We launch a TERMINATE, so MAX_WATERMARK is emitted
Event-time Timers fire
Savepoint (state persistence phase) succeeds
An exception is thrown during the notifyOnCheckpointComplete()
Appendix A: Cancel with Savepoint shortcomings
To understand the problem, first we will explain how an “exactly-once” sink is implemented in Flink in the general case. This holds true for every sink that implements a flavor of a “two-phase commit” protocol. Such a sink follows the pattern:
...
There is no expected change in public interfaces
Initial Implementation Plan
An initial sketch (by no means final) of the implementation can look like this:
...
the Job Manager triggers a synchronous savepoint at the sources, that also indicates one of TERMINATE or SUSPEND
sources send a MAX_WATERMARK in case of TERMINATE, nothing is done in case of SUSPEND
the Task Manager executes the checkpoint in a SYNCHRONOUS way, i.e. it blocks until the state is persisted successfully and the notifyCheckpointComplete() is executed.
the Task Manager acknowledges the successful persistence of the state for the savepoint
the Job Manager sends the notification that the checkpoint is completed
The Task Manager unblock the synchronous checkpoint execution.
Finishing the job propages from the sources, i.e. they shut down and EOS messages propagate through the job.
The Job Manager waits until the job state goes to FINISHED before declaring the operation successful.
Proposed Changes
The implementation plan (which will be further updated) roughly describes the components to be touched.