Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion threadhttp://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-1-Fine-grained-recovery-from-task-failures-td12510.html

JIRA: -

Released: -

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


This improvement proposal describes an enhancement that makes recovery more efficient by restarting only what needs to be restarted and building on cached intermediate results.

Original Design Document: For now, the actual design document is available here: https://docs.google.com/document/d/16S584XFzkfFu3MOfVCE0rHZ_JJgQrQuw9SXpanoMiMo/edit#heading=h.39f7p8sd8ymm

Motivation

Describe the problems you are trying to solve.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
  • Classes marked with the @Public annotation
  • On-disk binary formats, such as checkpoints/savepoints
  • User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
  • Configuration settings
  • Exposed monitoring information

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

When a task fails during execution, Flink currently resets the entire execution graph and triggers complete re-execution from the last completed checkpoint. This is more expensive than just re-executing the failed tasks.

Streaming (DataStream) Jobs

For many streaming jobs, this behavior is not critical, because many tasks have all-to-all dependencies (keyBy, event time) with their predecessors (upstream) or successors (downstream). In that case, operators usually cannot make progress anyways as long as one task is not delivering input or accepting output. Full restart only implies that those tasks also recompute their state, rather than being idle and waiting.

More fine grained recovery can help to reduce the amount of state that needs to be transferred upon recovery. If only 1/100 operators need to recover their state, then the one operator has the full bandwidth to the persistent store of the checkpoints, rather than sharing that bandwidth will the other operators that recover their state.

For some streaming jobs, full restarts are unnecessarily expensive. In particular for embarrassingly parallel jobs (no keyBy() or redistribute() operations), other parallel subtasks/partitions can keep running, and the streaming program as a whole would make progress.

Batch (DataSet) Jobs

Batch jobs do not perform any checkpoints and are hence completely restarted in case of a task failure. Batch jobs frequently have all-to-all dependencies between operators, but those are not necessarily pipelined, which makes it conceptually possible to have fine-grained restarts.


Proposed Changes

The core change is to only restart the pipelined connected component of the failed task. This should generalize the failure/recovery model.

We can develop this improvement in two steps:

Version (1) - Entire connected component is pipelined

That case assumes that all connections between operators are pipelined. The full connected component needs to be restarted.

For jobs that have multiple components (typically embarrassingly parallel jobs) this gives the desired improvement. For jobs with all-to-all dependencies, it will behave like the current failure/recovery model.

With Independent pipelines

Image Added


With all-to-all dependencies

Image Added


Version (2) - Limit pipelined connected component at intermediate results

To further reduce the amount of tasks that need to be restarted, we can use certain types of data stream exchanges. In the runtime, they are called “intermediate result types”, because each data stream that is exchanged between operators denotes an intermediate result.

Caching Intermediate Result

This type of data stream caches all elements since the latest checkpoint, possibly spilling them to disk, if the data exceeds the memory capacity.

When a downstream operator restarts from that checkpoint, it can simply re-read that data stream without requiring the producing operator to restart. Applicable to both batch (bounded) and streaming (unbounded) operations. When no checkpoints are used (batch), it needs to cache all data.

Memory-only caching Intermediate Result

Similar to the caching intermediate result, but discards sent data once the memory buffering capacity is exceeded. Acts as a “best effort” helper for recovery, which will bound recovery when checkpoints are frequent enough to hold data in between checkpoints in memory. On the other hand, it comes absolutely for free, it simply used memory that would otherwise not be used anyways.

Blocking Intermediate Result

This is applicable only to bounded intermediate results (batch jobs). It means that the consuming operator starts only after the entire bounded result has been produced. This bounds the cancellations/restarts downstream in batch jobs.



Image Added

Public Interfaces

Will affect the way failures are logged and displayed in the web frontend, since failures do not lead the job to holistically go to recovery

The Number-of-restarts parameter or RestartStrategy needs to be interpreted differently

  • maximum-per-task-failures or
  • maximum-total-task-failures


Compatibility, Deprecation, and Migration Plan

  • In the first version, the feature should be selectively activated (StreamExecutionEnvironment.setRecoveryMode(JOB_LEVEL | TASK_LEVEL)
  • Given the simple impact on user job configuration (and the fact that most users go for infinite restarts for streaming jobs), good documentation of the change should help.

 

Implementation Plan

Version two strictly builds upon version one - it only takes the intermediate result types into account as backtracking barriers.

Version (1) - Task breakdown

  1.  Change ExecutionGraph to not go into “Failing” status upon task failure
  2.  Add Backtracking and Forward Cancellation. Only one global change (status update beyond a single task execution) may happen in the ExecutionGraph concurrently.

Version (2) - Task breakdown

  1.  Extend backtracking to stop at Intermediate results that are available for the checkpoint to resume from.
  2. Implement “Caching Intermediate Result”
  3. Implement “Memory-only Caching Intermediate Result”
  4. Upon reaching a result that is not guaranteed to be there (like the “Memory-only Caching Intermediate Result”), the ExecutionGraph sends a message to the result (TaskManager holding it) to “pin” it, so it does not get released in the meantime.
    The response to the “pin” command is “okay” in which case the backtracking stops there, or “disposed”, in which case the backtracking continues.

Rejected Alternatives

(none yet)If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.