Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

Discussion thread: 

JIRA: 

...

Page properties


Discussion thread
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-18112

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink is no longer a pure streaming engine as it was born and has been extended to fit into many different scenarios over time: batch, AI, event-driven applications, e.t.c. Approximate task-local recovery is one of the attempts to fulfill these diversified scenarios and trade data consistency for fast failure recovery. More specifically, if a task fails, only the failed task restarts without affecting the rest of the job. Approximate task-local recovery is similar to  RestartPipelinedRegionFailoverStrategy with two major differences:

...


Approximate task-local recovery is useful in scenarios where a certain amount of data loss is tolerable, but a full pipeline restart is not affordable. A typical use case is online training. Online training jobs are usually complicated with all-to-all task connections, so a single task failure with RestartPipelinedRegionFailoverStrategy may result in a complete restart of the whole pipeline. Besides, the initialization is time-consuming, including the procedure of loading training models and starting Python subprocesses, etc. The initialization may take minutes to complete on average.

Goal

To ease the discussion, we divide the problem of approximate task-local recovery into three parts with each part only focusing on addressing a set of problems, as shown in the graph below.

  • Milestone One: sink failurerecovery. Here a sink task stands for no consumers reading data from it. In this scenario, if a sink vertex fails, the sink is restarted from the last successfully completed checkpoint and data loss is expected. If a non-sink vertex fails, a regional failover strategy takes place. In milestone one, we focus on issues related to task failure handling and upstream reconnection.
  • Milestone Two: downstream failure downstream recovery -- a failed task leads all tasks rooted from the failed tasks to restart. In milestone two, we focus on issues related to missing checkpoint barriers.
  • Milestone Three: single task failure task recovery -- a failed task leads itself to restart. In milestone three, we focus on issues related to downstream reconnection and different types of failure detection/handling (TM, JM, network, and e.t.c).

Milestone One

This is a design doc for the first milestone in approximate task-local recovery. More specifically,

  1. The sink task is restarted if an exception is caught from the task. The rest of the tasks keep running.
  2. After restarting, the sink task restores its state and reconnects to its upstream tasks and continues to process data.
  3. The amount of data leading that leads the failed sink from the state of the last completed checkpoint to the state when the sink fails is lost.

Proposed Changes

To achieve the goal, the following issues are addressed in this scenario

0. Failure Detection and Handling

To limit the discussion within the scope of sink failure, we only consider 

...

2). Network failures between TM and JM that can potentially cause FLIP-135 Approximate Task-Local Recovery (WIP)orphan tasks.

A full set of failure detection and handling will be discussed in Milestone Three. More explicitly, network failures between TMs are only partially discussed within the scope of Milestone One (Network connection errors between TMs).

1. Task Failure Handling

Currently, if a task fails, it transits to the state ExecutionState.FAILED and notifyFinalState to TaskExecutor. TaskExecutor updateTaskExecutionState to JM through JobMasterGateway. Then the JM switches job state from JobStatus.RUNNING to JobStatus.RESTARTING and the Scheduler handles failure through ExecutionFailureHandler#getFailureHandlingResult. FailoverStrategy decides the set of task vertices to restart. The default failover strategy is RestartPipelinedRegionFailoverStrategy. For a streaming job, the region failover strategy restarts the entire job if its jobGraph does not contain disconnected subgraphs.

In the sink failure case, only the sink vertex is expected to restart. The proposed changes are to extend RestartPipelinedRegionFailoverStrategy to restart a task if the task does not have any consumers and falls into the regular regional failover otherwise. If a non-sink vertex fails, a regional failover strategy takes place. Notice that RestartPipelinedRegionFailoverStrategy is untouched. The new strategy extends RestartPipelinedRegionFailoverStrategy, reuses most of the logic, and only overrides getTasksNeedingRestart.

2. Reconnects to Upstream

Currently, upstream tasks stay alive after downstream tasks fail. However, if the produced partition result type is PIPELINED, upstream tasks release produced partitions upon all of its subpartitions consumed (ReleaseOnConsumptionResultPartition). The failed (downstream) task cleans network resources when exiting, including InputChannel. Each channel maintains a ResultSubpartitionView, notifying the corresponding ResultSubpartition for consumption when releasing resources. 

...

  1. ResultSubPartition is kept when the consumer exits (task fails), the consumer only releases the view.
  2. ResultSubPartition is released through ResultPartitionManager on
    1. consumption finished; 
    2. producer failure/cancellation; 
    3. Job exits
  3. Multiple views can be created on the same subpartition, but only one view exists at a time (No two views exist at the same time).

The proposed changes are either:

...

We plan to introduce a new `ResultPartitionType.PIPELINED_APPROXIMATE` and its corresponding ResultSubPartition to minimize the impact on existing system behavior.


There are four more points worth to highlight here:

Anchor
orphan tasks
orphan tasks

Point1: Orphan/Zombie Tasks

TM may disconnect to JM, but both TM and JM are alive. In the sink failure case, TM fails the sink task, and JM redeploys the sink to a different TM slot requested from RM.

However, JM may deploy a new sink before TM successfully fails the old one. We have to make sure only one view exists at any time, and the view is connected with the new attemptId/attemptNumber. In theory, the old execution can connect after the new execution. In reality, it can happen, but rarely. If only one view is allowed at a time, creating ResultSubpartitionView for the newer execution attempt should prevent the creation of ResultSubpartitionView from the older execution attempt to succeed.

Anchor
network errors
network errors

Point2: Network connection errors between TM

There are cases where sink tasks are not able to notify upstream tasks to release views due to netty connection errors between TMs (the notification channel is broken). The restarted sink will find the upstream subpartition having an old view that has not been released yet upon successful reconnection. Remember that only one view is allowed at a time, so the old view will be released before a new one can be created.

Point3: Remote or Local Input Channel

Reconnection to the upstream task is not very different whether the downstream and the upstream is connected through LocalInputChannel or RemoteInputChannel. The information TaskDeploymentDescriptor needed needs for the downstream task to reconnect to upstream is generated when the failed task is redeployed. TaskDeploymentDescriptor#InputGateDeploymentDescriptor describes how to read data from the upstream task.

Point4: Credits

Upstream credits are reset to the initial state as the downstream lost all input data after restarting.

3. Partial Records

A record can possibly span over multiple buffers. If a task reads a partial record and fails, the partial record is lost after restarting. The remaining part read from the upstream can not be deserialized successfully (can not decide the end of the record).

...

Ideally, a view is released upon the failed sink task releasing resources. However, in the case where a sink task fails to notify its upstream to release its view due to network failure as discussed in Section2, Point2 Network connection errors between TMs, the view is released before recreation for the restarted sink task. Partial records are cleaned upon releasing the view in both cases.

...

From the writer’s side, the writer always serializes a complete record (RecordWriter#emit => SpanningRecordSerializer#serializeRecord) and copied to BufferBuilder RecordWriter#copyFromSerializerToTargetChannel till the end of the record, so there won’t be a partial record during serialization. Partial records occur from the reader’s point of view, buffers/resources beyond ResultSubpartitionView should be cleaned.

4. Missing Events

The main issue is missing checkpoint barriers. It is possible that after recovery, a checkpoint barrier is lost. This will cause the checkpoint of the failed task (sink) waiting for the lost barrier indefinitely and not able to be complete. Upstream tasks of the sink are not affected by missing events since events are missing at the place of the failed task (Notice that the upstream subpartition has to reset isBlockedByCheckpoint if the failed task is blocked by checkpoint alignment before failure). 

It is also possible that downstream of the failed tasks miss barriers as well, but we will postpone the discussion till later.

The proposed solution is to

...

Option1 is preferable because we have the max number of allowed pending checkpoints in most cases. The system can be blocked forever before pending checkpoints are aborted.

5. Checkpoint Coordinator

Currently, all pending checkpoints are aborted after the job state transits from Running to Restarting; a single task in the job restarts will cause a job state transition. Notifications of abort is sent to TaskExecutor#abortCheckpoint -> StreamTask#notifyCheckpointAbortAsync. The notification is not guaranteed to succeed. It should be fine even if the notification is unstable, as long as an incomplete checkpoint does not fail a job.

The Checkpoint coordinator remains the same.

6. State Restore

DefaultScheduler#restartTasks calls SchedulerBase#restoreState to restore states and abort pending checkpoints. With unaligned checkpoint enabled, during recovery failing over tasks have to:

...

This is because the restarted task will may not receive other remaining buffers to complete the partial records.  

No special handling needed when unaligned checkpoints are disabled. In the very first version, ignoring state recovery can also be an option.

7. Frequent Failed Task

There will be no changes in this regard and we will keep the existing behavior.

Public Interfaces

Milestone One is not provided as a public feature. For curious users, they can enable this feature through

ExecutionCheckpointingOptions#ENABLE_APPROXIMATE_LOCAL_RECOVERY