Status
Current state: Under Discussion
Discussion thread:
JIRA:
Released:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Flink is no longer a pure streaming engine as it was born and has been extended to fit into many different scenarios over time: batch, AI, event-driven applications, e.t.c. Approximate task-local recovery is one of the attempts to fulfill these diversified scenarios and trade data consistency for fast failure recovery. More specifically, if a task fails, only the failed task restarts without affecting the rest of the job. Approximate task-local recovery is similar to RestartPipelinedRegionFailoverStrategy with two major differences:
- Instead of restarting a connected region[flip1], approximate task-local recovery restarts only the failed task(s). In the setup of a streaming job (tasks connected with pipelined result partition type), a connected region is equal to the entire job.
- RestartPipelinedRegionFailoverStrategy is exactly-once, while approximate task-local recovery expects data loss and a bit of data duplication when sources fail.
Approximate task-local recovery is useful in scenarios where a certain amount of data loss is tolerable, but a full pipeline restart is not affordable. A typical use case is online training. Online training jobs are usually complicated with all-to-all task connections, so a single task failure may result in a complete restart of the whole pipeline. Besides, the initialization is time-consuming, including the procedure of loading training models and starting Python subprocesses, etc. The initialization may take minutes to complete on average.
Goal
To ease the discussion, we divide the problem of approximate task-local recovery into three parts with each part only focusing on addressing a set of problems, as shown in the graph below.
- Milestone One: sink failure. Here a sink task stands for no consumers reading data from it. In this scenario, if a sink vertex fails, the sink is restarted from the last successfully completed checkpoint and data loss is expected. If a non-sink vertex fails, a regional failover strategy takes place. In milestone one, we focus on issues related to task failure handling and upstream reconnection.
- Milestone Two: downstream failure -- a failed task leads all tasks rooted from the failed tasks to restart. In milestone two, we focus on issues related to missing checkpoint barriers.
- Milestone Three: single task failure -- a failed task leads itself to restart. In milestone three, we focus on issues related to downstream reconnection and different types of failure detection/handling (TM, JM, network, and e.t.c).
Milestone One
This is a design doc for the first milestone in approximate task-local recovery. More specifically,
- The sink task is restarted if an exception is caught from the task. The rest of the tasks keep running.
- After restarting, the sink task restores its state and reconnects to its upstream tasks and continues to process data.
- The amount of data leading the failed sink from the last completed checkpoint to the state when the sink fails is lost.
Proposed Changes
To achieve the goal, the following issues are addressed in this scenario
0. Failure Detection and Handling
To limit the discussion within the scope of sink failure, we only consider
1). Task failures detected by catching an exception inside a TM, and
2). Network failures between TM and JM that can potentially cause orphan tasks[ (details discussed in Section 2, Point1).
A full set of failure detection and handling will be discussed in Milestone Three. More explicitly, network failures between TMs are only partially discussed within the scope of Milestone One (the part affecting producers in Section2, Point2).
1. Task Failure Handling
Currently, if a task fails, it transits to the state ExecutionState.FAILED and notifyFinalState to TaskExecutor. TaskExecutor updateTaskExecutionState to JM through JobMasterGateway. JM switches job state from JobStatus.RUNNING to JobStatus.RESTARTING. Then the Scheduler handles failure through ExecutionFailureHandler#getFailureHandlingResult. FailoverStrategy decides the set of task vertices to restart. The default failover strategy is RestartPipelinedRegionFailoverStrategy. For a streaming job, the region failover strategy restarts the entire job.
In the sink failure case, only the sink vertex is expected to restart. The proposed changes are to extend RestartPipelinedRegionFailoverStrategy to restart a task if the task does not have any consumers and falls into the regular regional failover otherwise. Notice that RestartPipelinedRegionFailoverStrategy is untouched. The new strategy extends RestartPipelinedRegionFailoverStrategy, reuses most of the logic, and only overrides getTasksNeedingRestart.
2. Reconnects to Upstream
Currently, upstream tasks stay alive after downstream tasks fail. However, if the produced partition result type is PIPELINED, upstream tasks release produced partitions upon all of its subpartitions consumed (ReleaseOnConsumptionResultPartition). The failed (downstream) task cleans network resources when exiting, including InputChannel. Each channel maintains a ResultSubpartitionView, notifying the corresponding ResultSubpartition for consumption when releasing resources.
This causes the restarted sink task not able to locate the input result partition since it has already been released.
The requirements for the ResultSubPartition are as follows:
- ResultSubPartition is kept when the consumer exits (task fails), the consumer only releases the view.
- ResultSubPartition is released through ResultPartitionManager on
- consumption finished;
- producer failure/cancellation;
- Job exits
- Multiple views can be created on the same subpartition, but only one view exists at a time (No two views exist at the same time).
There are four more points worth to highlight:
Point1: Orphan/Zombie Tasks
It is possible that TM disconnects to JM, but both TM and JM are alive. In the sink failure case, TM fails the sink node, and JM redeploys sink to a different TM slot requested from RM.
However, it is possible that JM deploys a new sink before TM successfully fails the old one. We have to make sure only one view exists at any time, and the view is connected with the new attemptId/attemptNumber. In theory, the old execution has the possibility to connect after the new execution. This could happen, but rarely. If only one view is allowed at a time, creating ResultSubpartitionView for the newer execution attempt should prevent the creation of ResultSubpartitionView from the older execution attempt to succeed.
Compatibility, Deprecation, and Migration Plan
- What impact (if any) will there be on existing users?
- If we are changing behavior how will we phase out the older behavior?
- If we need special migration tools, describe them here.
- When will we remove the existing behavior?
Test Plan
Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.