Status

Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release

Draft: https://github.com/apache/flink/pull/1668

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The existing checkpoint algorithm (Asynchronous Barrier Snapshotting for DAGs) has a very fluid and straightforward protocol. It just makes sure that all checkpoint barriers are aligned in each operator so that all records before barriers (pre-shot) are processed before taking a snapshot. Since waiting indefinitely for all records in-transit within a cycle of an execution graph can violate termination (crucial liveness property) we have to...save any unprocessed records for later and get done with the snapshot.

Public Interfaces

No public interfaces are affected by the change

Proposed Changes

In this take of the algorithm on Flink we assign that role to the Iteration Head. The steps this version varies from the vanilla algorithm are simply the following:

(1) An Iteration Head receives a barrier from the system runtime (as before) and:

  • Goes into Logging Mode. That means that from that point on every record it receives from its Iteration Sink is buffered in its own operator state (log) till step (2).

  • Forwards the barrier to its downstream nodes (this guarantees liveness, otherwise we have a deadlock) and feedback records as usual since their effects are part of a future checkpoint.

(2) Eventually, the Iteration Head receives a barrier back from its Iteration Sink. At that point:

  • It checkpoints the clears the backup log.

  • Goes back to normal forwarding mode.

(3) When the Iteration Head starts/restarts it looks at its initial operator state (the log) and flushes any records that are possibly pending.


Explanation and Intuition

Fig.3 - A simple cyclic dataflow graph

ftloops-topology


In the simple topology of Fig.3 we have a single source S and a mapper M within an iteration with a head H and tail T.

This is the bare minimum we need for now to check how this works.


 Fig.4 - Execution Example for the graph of Fig.3

diagram

In the diagram of Fig.4 you can see the sequence of possible events, containing both barrier and record transmissions. For completeness we denote Runtime as a separate entity, this is in our case the Checkpoint Coordinator who periodically initialises checkpointing in all tasks without inputs.The point that this diagram tries to make is the following:

Record R1 (or any record received in normal mode) gets forwarded to M, the only consumer of H before the checkpoint barrier. On the other hand, R2 is not forwarded to M until H has finished snapshotting (or during the same atomic block anyway). In case of a failure R2 is not lost, rather than saved for later.

A very brief description of a consistent/correct snapshot in our context could be summed up in the following sentence:

Every record prior to a snapshot has been either processed by an operator (state depends on record) or included in the snapshot as event in transit (see Chandy Lamport algorithm).

This algorithm guarantees this property and as explained earlier, it also terminates.


Compatibility, Deprecation, and Migration Plan

  • We can now remove cyclic graph checks and warnings when checkpoints are enabled. This is a transparent API change which also makes this backwards compatible.

Test Plan

A single integration test that verifies exactly-once state updates upon recovery when loops exist in the graph suffices.

Rejected Alternatives

no rejected alternatives considered