Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussion

...

Page properties


Discussion thread

...

...

thread/7d8xx1d2qqxp41pwgmw8kdgyk2c8gcv7
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-11458

Release1.9

...

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

...


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In many scenarios, we want to finish a job with a savepoint. This can be, for example, because we want to upgrade our cluster, or because we finished processing the important part of our data and we want to simply terminate the job, but make sure that our output is safely stored to our sink storage system, e.g. using the StreamingFileSink.

...

This document targets fixing the end-to-end semantics when stopping jobs and proposes the addition of two shutting down modes which ensure that no events come after the checkpoint. In the remainder of the document we clarify the semantics of the two shutdown methods and discuss the implementation of the feature.


Goals


We propose the addition of the following two shutdown modes, namely:

...

The remainder of the paragraph presents the similarities and differences of the two modes.


SIMILARITIES

Both modes shut down with a checkpoint/savepoint and ensure all side effects go through. This will guarantee exactly-once semantics end-to-end. In addition, both modes ensure that no events come after the checkpoint. This will allow for better guarantees of at-least once sinks. Finally, in both cases, the JOB STATE should be FINISHED to indicate that the operation that the user requested completed successfully. This differs from how the current “cancel with savepoint” is implemented, which ends the Job in state CANCELED. Having the job state being CANCELED would make it impossible for the user to differentiate between success and failure of the requested operation, i.e. it can happen that someone cancels the job while a TERMINATE or SUSPEND is happening.


DIFFERENCES


Event Time Semantics

In SUSPEND, the user just wants to “pause” the job rather than stop it. For a job with event-time timers, this would imply that we do not want the registered (event-time) timers to fire, as the job will resume in the future. In this case, we should not send MAX_WATERMARK (for event-time).

...

In TERMINATE, we want the timers to fire, as this will flush out any buffered state associated with timers, e.g. non-fired windows. In this case, we should emit MAX_WATERMARK for event-time timers. For processing time timers, there is not much we can do for now.


System Perspective

The following table illustrates the differences between the two new operations from an operator/task perspective.



Source OPS

Task Status

Job Status

SUSPEND

Checkpoint Barrier,

End Of Stream

Finished

Finished

TERMINATE

MAX_WATERMARK, Checkpoint Barrier,

End Of Stream

Finished

Finished


User-Facing Changes


  • Expose the new SUSPEND and TERMINATE operations through REST to the user

  • Change event time semantics in accordance with the above table


Open questions

  1. Any better suggestions about the names of TERMINATE and SUSPEND

  2. Failure in the TERMINATE case:

    1. We launch a TERMINATE, so MAX_WATERMARK is emitted

    2. Event-time Timers fire

    3. Savepoint (state persistence phase) succeeds

    4. An exception is thrown during the notifyOnCheckpointComplete()



Appendix A: Cancel with Savepoint shortcomings


To understand the problem, first we will explain how an “exactly-once” sink is implemented in Flink in the general case. This holds true for every sink that implements a flavor of a “two-phase commit” protocol. Such a sink follows the pattern:

...

There is no expected change in public interfaces


Initial Implementation Plan

An initial sketch (by no means final) of the implementation can look like this:

...

  1. the Job Manager triggers a synchronous savepoint at the sources, that also indicates one of TERMINATE or SUSPEND

  2. sources send a MAX_WATERMARK in case of TERMINATE, nothing is done in case of SUSPEND

  3. the Task Manager executes the checkpoint in a SYNCHRONOUS way, i.e. it blocks until the state is persisted successfully and the notifyCheckpointComplete() is executed.

  4. the Task Manager acknowledges the successful persistence of the state for the savepoint

  5. the Job Manager sends the notification that the checkpoint is completed

  6. The Task Manager unblock the synchronous checkpoint execution.

  7. Finishing the job propages from the sources, i.e. they shut down and EOS messages propagate through the job.

  8. The Job Manager waits until the job state goes to FINISHED before declaring the operation successful.


Proposed Changes

The implementation plan (which will be further updated) roughly describes the components to be touched.