You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 44 Next »

Status

Current state: ACCEPT

Discussion thread: https://lists.apache.org/thread.html/r2780b46267af6e98c7427cb98b36de8218f1499ae098044e1f24c527%40%3Cdev.flink.apache.org%3E

JIRA:  Unable to render Jira issues macro, execution error.

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As discussed in FLIP-131, Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:

  1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML (Case 1, Case 2, Case 3) and is future brought up when working on FLIP-143: Unified Sink API.
  2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.

Therefore, we need to also support checkpoints even after some tasks are finished. 

Overall Design

There are multiple options to achieve this target. In this section we compare different options.

Option 1. Prevent tasks from finishing

The first option is to prevent the tasks from finishing until the whole job is finished. Specially, tasks would never finish if the job also has unbounded sources. This option is not preferred due to that

  1. it causes zombie tasks and waste resources. 
  2. it also requires a new “pre-EndOfPartition” event to indicate all the records are sent. Otherwise if it directly sent EndOfPartition before tasks finished, the communication channel would be destroyed and it would also be weird to have checkpoint barriers after EndOfPartition. However, introducing the “pre-EndOfPartition” event would largely complicate the termination process. 

Option 2. Allow tasks to finish & Checkpoints contain the final states from finished tasks

Another option is allowing tasks to finish normally and checkpoints after tasks finished would only take snapshots for the running tasks. A core issue of this option is whether we need to include the final states collected when a tasks finish in the following checkpoints. Currently when failover happens after some tasks are finished, the job will fallback to a checkpoint taken when all the tasks are running. Including the final states of the finished tasks ensures the behavior unchanged compared with the current one since the finished tasks could be viewed as still running. However it also introduce some problems:

  1. Including the states from finished tasks in all the following checkpoints requires the states get managed in the master side, which causes additional overhead.
  2. Since the same states from the finished tasks would be used in multiple checkpoints, we need to introduce the reference count between checkpoints and snapshots. This complicates the checkpoint management, especially after we already have the reference count between snapshots and state items due to incremental checkpoints.
  3. Including the states from the finished tasks implies that even if all the tasks of an operator have finished, we still need to restart the tasks of this operators after failover. For the long run, it limits the possibility that we only resume the execution for the operators not fully finished. 

Figure 1. An illustration of the structure of the Checkpoint. One issue is that whether we need to keep the operator states collected from the finished tasks. 

Option 3. Allow tasks to finish & Checkpoints do not contain the final states from finished tasks (The Chosen One)

If we do not want to include the states from the finished tasks instead, we need to explore how it changes the current behavior. Although in a checkpoint the state is snapshotted in the unit of task, it is finally reorganized by the unit of operator since there might be topology changes or rescaling. In other words, we need to view the checkpoint as composed of the current working progress of each operator, and tasks are only stateless runtime units to execute the remaining work for the operators. If we do not include the state from the finished tasks, it is equivalent to some operator discarding a part of finished work’s state in the checkpoint. Let ΔR represents the state of running task and ΔF represents the final states of finished tasks when taking checkpoints, then the result (e.g., the records sent to the descendant operator) of the operator’s execution after failover is

g(I, ΔR+ ΔF)=g(IRΔR)+g(IFΔF)

where I is the input after failover, g represents the logic of this operator and the decomposition is due to the fact that the work could be distributed to the different subtasks. Ideally the result should be the same with the execution without the states from the finished tasks, namely g(I, ΔR+ ΔF)=g(IΔR), which is equivalent to 

g(IFΔF)=Ø

Namely there should be no records sent due to ΔF no matter whether we keep it or not. 

Source Operators

The source operator does not have input and the equation is further equivalent to g(ΔF)=Ø. The logic of the source operators could be modeled as reading each split from the external system and emitting the records to the pipeline. With legacy source API the source operators usually discover all the splits on startup and record the progress of each split in a union list state. Unfortunately with this pattern if we discard the state for the finished splits, we would re-read them after failover, which violates the condition if we do not keep ΔF. The new source API would overcome this issue since now the splits are discovered and recorded in the OperatorCoordinator, whose state is still kept after all the tasks are finished. 

In consideration that we would finally migrate to the new source API, we could temporarily avoid the repeat records issue of the legacy source issue by 

  1. Failing checkpoints if some source operators contain both finished and unfinished subtasks.
  2. Skipping the actual execution for legacy source task (namely StreamSourceTask) if all the subtasks have finished before failover. This requires we also bookkeep the finished operators in the checkpoints.

As a whole, the source operators could achieve the condition, and the temporary hack could be removed after we migrated to the new Source API.


Non-source Operators

The logic of a non-source operator could be split into processing the input records and the logic in initialize and endOfInput, namely the condition is equivalent to  

g(IFΔF)=gp(IFΔF)+gc(ΔF)=Ø

For the first part, if in a checkpoint some subtask of a non-source operator is finished, then

  1. All the source tasks of ALL_TO_ALL inputs are finished and IF. This is due to these precedent tasks sending EndOfPartition to the subtasks of the operator in discussion at the same time, thus the running subtasks should also have received all the records in the checkpoints. 
  2. For the source tasks of POINTWISE inputs, the precedent tasks of the finished subtasks must also be finished. This indicates all the remaining input records should not rely on ΔF, otherwise they could not be correctly computed even without failover. This implies all the remaining records belongs to Iand IF=Ø.

Thus, we should always have IF=Ø and thus gp(IFΔF)=Ø no matter whether we save ΔF

The situation of the second part is equivalent to the source operators. However, the non-source operators rarely have the similar logic as legacy source. Instead, the result related to ΔF is usually sent before finished and does not need to resent after failover. For example, the operators doing aggregation and sending the final result on endOfInput only need sending the remaining aggregation value after failover. Even if there are operators does not satisfy the condition, the operators could push the states could not be discarded to OperatorCoordinator instead, like the new Source API does. 


Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

Proposed Changes

This section details the proposed changes for the options 3 chosen in the last section.

Triggering Checkpoints After Tasks Finished

Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.

Figure 2. An example execution graph with one task finished. 

It might be argued that we could wait till the Task C to finish during this checkpoint, then we could not trigger task C in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

To find the new "root" of the current execution graph,  we iterates all the tasks to find the tasks who are still running but have no running precedent tasks. A direct implementation would have O(n2) time complexity since it needs to check all the precedent tasks of each task. However, we could reduce the complexity to O(n) by exploiting the isomorphism of ALL_TO_ALL edges. The detail is described in Appendix 1. 

To avoid the case that the tasks finished during the computation, the computation is done in the JobMaster's main thread. However, there might still be inconsistency due to:

  1. For tasks running on different TaskManagers, the order of the reports of FINISHED status arrived at JobMaster is not guaranteed. That is to say some tasks might report FINISHED after its descendant tasks. 
  2. The tasks might finish between the computation and the tasks get triggered. 

In both cases, the checkpoint trigger would fail and the whole checkpoint would then fail due to timeout. Since checkpoint timeout would block the next checkpoint and cause failover by default, it need to detect the trigger failure as earlier as possible. The failure could be detected if a task finished before acknowledging the snapshot for this checkpoint. CheckpointCoordinator would listen to the event of task finish, when a task finish, it iterates all the pending checkpoints to do the detection.

Make CheckpointBarrierHandler Support EndOfPartition on TM Side





The basic algorithm to compute the tasks to trigger would be iterating over the ExecutionGraph to find the new root running tasks. However, the algorithm could be optimized by iterating over the JobGraph instead. The detailed algorithm is shown in Appendix. 


Figure 3. Different options to manage the (Stream)Task's Finished status.

A core divergence for this part is what kind of tasks are considered as FINISHED. As a whole, there are two possible options:

Option 1: Reuse Tasks' FINISH status

The first option is to reuse the tasks' FINISH status managed by the Scheduler. This option avoids additional overhead to maintain tasks' status. Since the tasks' status is modified in JobMaster's main thread, to avoid data inconsistencies we would need to proxy the task to compute tasks to trigger also to the main thread. 

This option might have one problem. During the time we compute the tasks to trigger and the trigger message reaching the TaskExecutor, the tasks might finish during this period, which make the trigger fail. If we do not deal with this situation, the checkpoint would fail after timeout since not all tasks report their snapshots. To deal with this problem, we would need to make CheckpointCoordinator listen to the FINISH status report of the tasks, and if one task finished before reporting snapshot, CheckpointCoordinator would re-compute the tasks to trigger and re-trigger the new sources after the tasks finished. Although this patch could solve this issue, if we met with the cascade cases unfortunately that the triggers keep fail due to tasks get finished, the JM might incurs high overhead. 

Option 2: Checkpoint Coordinator maintains the finish status separately

If triggering one task fails due to the task finished in advance, what we need to is to report an empty state for this task and re-trigger its descendant tasks. The first action is simple to achieve since the state is empty and the core is how to notify the descendant tasks. For option 1 we have to go via JobMaster since when the task report FINISH the channels to the descendant tasks have already been closed by emitting EndOfPartition.

Therefore, another option is to not reuse the tasks' FINISH status and introduce another report before emitting EndOfPartition. As shown in Figure 2, we could make StreamTask report to CheckpointCoordinator about its going to finish. CheckpointCoordinator would marked the task as finished immediately when received the report, and reply the tasks with the pending checkpoints that do not trigger it successfully. The StreamTask would then broadcast corresponding barriers to the descendant tasks as normal checkpoints. 

Compare to the first option, this option introduces a new RPC and asks CheckpointCoordinator to maintain a set of finished tasks. CheckpointCoordinator also need to remove the tasks from the set if it restarted. However, the maintained set could be discarded on JobMaster Failover, which decrease the complexity. Besides, this option avoid the cascade finish problem. 

For the first version we would go with the option 1 to avoid complex implementation involving new RPC and duplicate finished state.

Extend the task Barrier Alignment

On the task side, we also need to modify the task barrier alignment mechanism to support the cases that some precedent tasks are finished. In this case, the downstream task would receive the EndOfPartition messages from the input channels.

Figure 4. The tasks would either receive the checkpoint trigger and receive EndOfPartition from all the channels, or do not receive checkpoint trigger and receive barriers from some channels.

For one task, it might be notified of checkpoint via two ways:

  1. If all its precedent tasks have finished, it would receive the checkpoint trigger message via RPC.
  2. If some of its precedent tasks are still running, it would receive the checkpoint barrier from the corresponding channels.

For both cases, to avoid interference with the logic of checkpoint alignment, we would insert barriers in the channels right before EndOfPartition:

  1. Whenever an input channel enqueued EndOfPartition from network or local precedent result partition, it would know the corresponding precedent task is finished. It would then insert a prioritized message into the channel to notify the CheckpointBarrierHandler about it. 
  2. Whenever CheckpointBarrierHandle received a new barrier or checkpoint trigger for the first time, it would insert the corresponding barriers into all the input channels that have received EndOfPartition, right before the EndOfPartition. The inserted barrier would respect the options of the checkpoint, like enabled unaligned checkpoint or checkpoint alignment timeout.
  3. During step 1, when CheckpointBarrierHandler knows that an input channel has enqueued EndOfPartition, it would also check the pending checkpoints and insert barriers into this input channel. 
  4. After CheckpointBarrierHandler has actually processed EndOfPartition from one channel, it would stop watching this channel and decrease the total number of barriers required for each checkpoint.
  5. After CheckpointBarrierHandler has processed EndOfPartition from all the channels, it would only received checkpoint notification of case 1 and would directly trigger snapshot. 


One pending problem here is for unaligned checkpoint, the barriers are inserted only after the downstream tasks have received EndOfPartition. Compared with normal unaligned checkpoint, the checkpoint here could only skip the buffer cached in the input channels, but not the buffers in the upstream tasks' result partitions. This might enlarge the time required. We could not allows the barriers to skip the result partition of the upstream tasks because the upstream tasks should have finished and could not recover the snapshotted result partition data after failover. To avoid introducing slow checkpoint, we might make the upstream tasks to wait till all the buffers in the result partition are flushed into the network. Then during this period, the normal unaligned checkpoint could still be taken. But since it does not affect the correctness, we would consider this optimization in future version. 

Do Not Cancel Pending Checkpoint on Finish

Currently when a task finish, all the pending checkpoints would be canceled. Since now we would support checkpoints after tasks finished, tasks finish should not fail the checkpoints and we would change the behavior to waiting for all the pending checkpoints to finish before task get finished.

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. Specially, if all the source operators have been finished, the remaining operator would directly start the process of finishing. 

For the first version, we would still restart all the tasks, and the finished tasks would start finishing normally. This simplifies the implementation of the task side and scheduler. 

Rejected Alternatives


Compatibility, Deprecation and Migration Plan

As discussed in the second section, the FLIP might change the behavior after failover if some operators have logic in initialization or endOfInput that relies on the state from the finished subtasks. However, such jobs would be rare and need to manually modify the job. 

The FLIP would also modify the CompletedCheckpoint serialization format by also recording which operators are fully finished. However, this would not cause compatibility issues due to Java’s serialization mechanism supporting adding or deleting fields. 

Appendix

1.  Computing Tasks to Trigger

The target tasks to trigger are those who are still running but do not have any running precedent tasks. To avoid introducing large overhead to computing tasks to trigger for large jobs, we travel the JobGraph instead of traveling the ExecutionGraph directly. The algorithm is based on the following observation: 

  1. If JobVertex A is connected to JobVertex B and not all tasks of JobVertex A are running, then no tasks of JobVertex B might be triggered if A -> B is ALL_TO_ALL, and only tasks connected from finished tasks might be triggered if A -> B is POINTWISE.
  2. If one task is known to not need to trigger, it must be RUNNING since it is connected from some other running tasks. This indicates that the FINISHED tasks are always a subset of the tasks we need to check for triggering for each JobVertex.
  3. If one JobVertex is connected from multiple precedents JobVertices, then the tasks to check is the Union of the tasks decided by each president JobVertices.

Based on the above observation, we travel the JobVertices via topology order and for each job vertex, we could narrow down the tasks to check for all its descendant JobVertices. For the tasks to check, the not finished ones would be tasks to trigger. The pseudo-code of the algorithm would be

Initialize V.tasks_to_check for each JobVertex V to be all its tasks;

for each JobVertex V in topology order;do
    if V.filtered_tasks_to_check is empty;then
        continue;
    endif

    running, finished = split V.filtered_tasks_to_check
    add running tasks to the tasks to trigger

    for each descendant JobVertex SubV;do
        if finished < all_tasks;then
            if V -> SubV is ALL_TO_ALL;then
                filtered_tasks_to_check = {}
            elif V -> SubV is POINTWISE;then
                filtered_tasks_to_check={tasks connected from running tasks of V}
            endif

            SubV.tasks_to_check = SubV.tasks_to_check union filtered_tasks_to_check;
        endif
    done
done


  • No labels