Status
Current state: Accepted
Discussion thread: here
JIRA:
Released:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Flink is no longer a pure streaming engine as it was born and has been extended to fit into many different scenarios over time: batch, AI, event-driven applications, e.t.c. Approximate task-local recovery is one of the attempts to fulfill these diversified scenarios and trade data consistency for fast failure recovery. More specifically, if a task fails, only the failed task restarts without affecting the rest of the job. Approximate task-local recovery is similar to RestartPipelinedRegionFailoverStrategy with two major differences:
- Instead of restarting a connected region[FLIP1: Fine Grained Recovery from Task Failures], approximate task-local recovery restarts only the failed task(s). In the setup of a streaming job (tasks connected with pipelined result partition type), a connected region is equal to the entire job in most cases.
- RestartPipelinedRegionFailoverStrategy is exactly-once, while approximate task-local recovery expects data loss and a bit of data duplication when sources fail.
Approximate task-local recovery is useful in scenarios where a certain amount of data loss is tolerable, but a full pipeline restart is not affordable. A typical use case is online training. Online training jobs are usually complicated with all-to-all task connections, so a single task failure with RestartPipelinedRegionFailoverStrategy may result in a complete restart of the whole pipeline. Besides, the initialization is time-consuming, including the procedure of loading training models and starting Python subprocesses, etc. The initialization may take minutes to complete on average.
Goal
To ease the discussion, we divide the problem of approximate task-local recovery into three parts with each part only focusing on addressing a set of problems, as shown in the graph below.
- Milestone One: sink recovery. Here a sink task stands for no consumers reading data from it. In this scenario, if a sink vertex fails, the sink is restarted from the last successfully completed checkpoint and data loss is expected. If a non-sink vertex fails, a regional failover strategy takes place. In milestone one, we focus on issues related to task failure handling and upstream reconnection.
- Milestone Two: downstream recovery -- a failed task leads all tasks rooted from the failed tasks to restart. In milestone two, we focus on issues related to missing checkpoint barriers.
- Milestone Three: single task recovery -- a failed task leads itself to restart. In milestone three, we focus on issues related to downstream reconnection and different types of failure detection/handling (TM, JM, network, and e.t.c).
Milestone One
This is a design doc for the first milestone in approximate task-local recovery. More specifically,
- The sink task is restarted if an exception is caught from the task. The rest of the tasks keep running.
- After restarting, the sink task restores its state and reconnects to its upstream tasks and continues to process data.
- The amount of data leading the failed sink from the last completed checkpoint to the state when the sink fails is lost.
Proposed Changes
To achieve the goal, the following issues are addressed in this scenario
0. Failure Detection and Handling
To limit the discussion within the scope of sink failure, we only consider
1). Task failures detected by catching an exception inside a TM, and
2). Network failures between TM and JM that can potentially cause FLIP-135: Approximate Task-Local Recovery#orphan tasks.
A full set of failure detection and handling will be discussed in Milestone Three. More explicitly, network failures between TMs are only partially discussed within the scope of Milestone One (Network connection errors between TMs).
1. Task Failure Handling
Currently, if a task fails, it transits to the state ExecutionState.FAILED and notifyFinalState to TaskExecutor. TaskExecutor updateTaskExecutionState to JM through JobMasterGateway. Then the JM switches job state from JobStatus.RUNNING to JobStatus.RESTARTING and the Scheduler handles failure through ExecutionFailureHandler#getFailureHandlingResult. FailoverStrategy decides the set of task vertices to restart. The default failover strategy is RestartPipelinedRegionFailoverStrategy. For a streaming job, the region failover strategy restarts the entire job if its jobGraph does not contain disconnected subgraphs.
In the sink failure case, only the sink vertex is expected to restart. The proposed changes are to extend RestartPipelinedRegionFailoverStrategy to restart a task if the task does not have any consumers. If a non-sink vertex fails, a regional failover strategy takes place. Notice that RestartPipelinedRegionFailoverStrategy is untouched. The new strategy extends RestartPipelinedRegionFailoverStrategy, reuses most of the logic, and only overrides getTasksNeedingRestart.
2. Reconnects to Upstream
Currently, upstream tasks stay alive after downstream tasks fail. However, if the produced partition result type is PIPELINED, upstream tasks release produced partitions upon all of its subpartitions consumed (ReleaseOnConsumptionResultPartition). The failed (downstream) task cleans network resources when exiting, including InputChannel. Each channel maintains a ResultSubpartitionView, notifying the corresponding ResultSubpartition for consumption when releasing resources.
This causes the restarted sink task not able to locate the input result partition since it has already been released.
The requirements for the ResultSubPartition are as follows:
- ResultSubPartition is kept when the consumer exits (task fails), the consumer only releases the view.
- ResultSubPartition is released through ResultPartitionManager on
- consumption finished;
- producer failure/cancellation;
- Job exits
- Multiple views can be created on the same subpartition, but only one view exists at a time (No two views exist at the same time).
We plan to introduce a new `ResultPartitionType.PIPELINED_APPROXIMATE` and its corresponding ResultSubPartition to minimize the impact on existing system behavior.
There are four more points worth to highlight here:
Point1: Orphan/Zombie Tasks
TM may disconnect to JM, but both TM and JM are alive. In the sink failure case, TM fails the sink task, and JM redeploys the sink to a different TM slot requested from RM.
However, JM may deploy a new sink before TM successfully fails the old one. We have to make sure only one view exists at any time, and the view is connected with the new attemptId/attemptNumber. In theory, the old execution can connect after the new execution. In reality, it can happen, but rarely. If only one view is allowed at a time, creating ResultSubpartitionView for the newer execution attempt should prevent the creation of ResultSubpartitionView from the older execution attempt to succeed.
Point2: Network connection errors between TM
There are cases where sink tasks are not able to notify upstream tasks to release views due to netty connection errors between TMs (the notification channel is broken). The restarted sink will find the upstream subpartition having an old view that has not been released yet upon successful reconnection. Remember that only one view is allowed at a time, so the old view will be released before a new one can be created.
Point3: Remote or Local Input Channel
Reconnection to the upstream task is not very different whether the downstream and the upstream is connected through LocalInputChannel or RemoteInputChannel. The information TaskDeploymentDescriptor needs for the downstream task to reconnect to upstream is generated when the failed task is redeployed. TaskDeploymentDescriptor#InputGateDeploymentDescriptor describes how to read data from the upstream task.
Point4: Credits
Upstream credits are reset to the initial state as the downstream lost all input data after restarting.
3. Partial Records
A record can span over multiple buffers. If a task reads a partial record and fails, the partial record is lost after restarting. The remaining part read from the upstream can not be deserialized successfully (can not decide the end of the record).
The proposed change to eliminate partial records is to either
- clean up partial results when releasing ResultSubpartitionView, or
- clean up partial results when recreating ResultSubpartitionView after the failed sink reconnects.
Ideally, a view is released upon the failed sink task releasing resources. However, in the case where a sink task fails to notify its upstream to release its view due to network failure as discussed in Network connection errors between TMs, the view is released before recreation for the restarted sink task. Partial records are cleaned upon releasing the view in both cases.
Option 1 is preferred since it prunes fewer data. The reason is that Option1 cleans up partial data earlier than Option 2 in most cases. In Option2, all emitted buffers before the downstream side reconnects are discarded finally as these buffers adhere to partial records among each other. we can only retain the buffers with the clean boundary without partial records in front.
From the writer’s side, the writer always serializes a complete record (RecordWriter#emit => SpanningRecordSerializer#serializeRecord) and copied to BufferBuilder RecordWriter#copyFromSerializerToTargetChannel till the end of the record, so there won’t be a partial record during serialization. Partial records occur from the reader’s point of view, buffers/resources beyond ResultSubpartitionView should be cleaned.
4. Missing Events
The main issue is missing checkpoint barriers. It is possible that after recovery, a checkpoint barrier is lost. This will cause the checkpoint of the failed task (sink) waiting for the lost barrier indefinitely and not able to be complete. Upstream tasks of the sink are not affected by missing events since events are missing at the place of the failed task (Notice that the upstream subpartition has to reset isBlockedByCheckpoint if the failed task is blocked by checkpoint alignment before failure).
The proposed solution is to
- attach the max checkpointID to the task deployment descriptor (in JM) so that sink can drop any barrier that is no-bigger than the id. A newly triggered checkpoint (max checkpointID + 1) will still be valid. The id can be fetched from CheckpointIDCounter in JM. Or,
- wait until a new checkpoint is triggered after restarting to drop the old not-aligned-yet barrier.
Option1 is preferable because we have the max number of allowed pending checkpoints in most cases. The system can be blocked forever before pending checkpoints are aborted.
5. Checkpoint Coordinator
Currently, all pending checkpoints are aborted after the job state transits from Running to Restarting; a single task in the job restarts will cause a job state transition. Notifications of abort is sent to TaskExecutor#abortCheckpoint -> StreamTask#notifyCheckpointAbortAsync. The notification is not guaranteed to succeed. It should be fine even if the notification is unstable, as long as an incomplete checkpoint does not fail a job.
The Checkpoint coordinator remains the same.
6. State Restore
DefaultScheduler#restartTasks calls SchedulerBase#restoreState to restore states. With unaligned checkpoint enabled, during recovery failing over tasks have to:
- drop all of the in-flight data (easier),
- drop at least all of the partial records from restored input/output in-flight data,
This is because the restarted task may not receive other remaining buffers to complete the partial records.
No special handling needed when unaligned checkpoints are disabled. In the very first version, ignoring state recovery can also be an option.
7. Frequent Failed Task
There will be no changes in this regard and we will keep the existing behavior.
Public Interfaces
Milestone One is not provided as a public feature. For curious users, they can enable this feature through
ExecutionCheckpointingOptions#ENABLE_APPROXIMATE_LOCAL_RECOVERY