You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current state: Under Discussion

Discussion thread: <pending>

JIRA: <pending>

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As discussed in FLIP-131, Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:

  1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML (Case 1, Case 2, Case 3) and is future brought up when working on FLIP-143: Unified Sink API.
  2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.

Therefore, we need to also support checkpoints even after some tasks are finished. 

Overall Design

There are multiple options to achieve this target. In this section we compare different options.

Option 1. Prevent tasks from finishing

The first option is to prevent the tasks from finishing until the whole job is finished. Specially, tasks would never finish if the job also has unbounded sources. This option is not preferred due to that

  1. it causes zombie tasks and waste resources. 
  2. it also requires a new “pre-EndOfPartition” event to indicate all the records are sent. Otherwise if it directly sent EndOfPartition before tasks finished, the communication channel would be destroyed and it would also be weird to have checkpoint barriers after EndOfPartition. However, introducing the “pre-EndOfPartition” event would largely complicate the termination process. 

Option 2. Allow tasks to finish & Keep Final Snapshots

Another option is allowing tasks to finish normally and checkpoints after tasks finished would only take snapshots for the running tasks. A core issue of this option is whether we need to keep the final snapshots of the finished tasks and also includes their final snapshots in the following checkpoints. Currently when failover happens after some tasks are finished, the job will fallback to a checkpoint taken when all the tasks are running. Including the final snapshots of the finished tasks ensures the behavior unchanged compared with the current one since the finished tasks could be viewed as still running. However it also introduce some problems:

  1. It causes additional overhead to taking and managing the final snapshots.
  2. Since the same final snapshots would be used in multiple checkpoints, we need to introduce the reference count between checkpoints and snapshots. This complicates the checkpoint management, especially after we already have the reference count between snapshots and state items due to incremental checkpoints.
  3. For the long run, it limits the possibility that we only restart the running tasks on failover. 

Option 3. Allow tasks to finish & Do not keep Final Snapshots

If we do not want to keep the final snapshots of the finished tasks instead, we need to explore how it changes the current behavior. Although in a checkpoint the state is snapshotted in the unit of task, it is finally reorganized by the unit of operator since there might be topology changes or rescaling. In other words, we need to view the checkpoint as composed of the current working progress of each operator, and tasks are only stateless runtime units to execute the remaining work for the operators. If we do not include the state from the finished tasks, it is equivalent to some operator discarding a part of finished work’s state in the checkpoint. Let ΔR represents the state of running task and ΔF represents the state of finished task when taking checkpoints, then the result of the operator’s execution after failover is

g(I, ΔR+ ΔF)=g(IRΔR)+g(IFΔF)

where I is the input after failover and the decomposition is due to the fact that the work could be distributed to the different subtasks. Ideally the result should be the same with the execution without the states from the finished tasks, namely g(I, ΔR+ ΔF)=g(IΔR), which is equivalent to 

g(IFΔF)=Ø

Namely there should be no records sent due to ΔF no matter whether we keep it or not. 

Source Operators

The source operator does not have input and the equation is further equivalent to g(ΔF)=Ø. The logic of the source operators could be modeled as reading each split from the external system and emitting the records to the pipeline. With legacy source API the source operators usually discover all the splits on startup and record the progress of each split in a union list state. Unfortunately with this pattern if we discard the state for the finished splits, we would re-read them after failover, which violates the condition if we do not keep ΔF. The new source API would overcome this issue since now the splits are discovered and recorded in the OperatorCoordinator, whose state is still kept after all the tasks are finished. 

In consideration that we would finally migrate to the new source API, we could temporarily avoid the repeat records issue of the legacy source issue by 

  1. Failing checkpoints if some source operators contain both finished and unfinished subtasks.
  2. Skipping the actual execution for legacy source task (namely StreamSourceTask) if all the subtasks have finished before failover. This requires we also bookkeep the finished operators in the checkpoints.

As a whole, the source operators could achieve the condition, and the temporary hack could be removed after we migrated to the new Source API.


Non-source Operators

The logic of a non-source operator could be split into processing the input records and the logic in initialize and endOfInput, namely the condition is equivalent to  

g(IFΔF)=gp(IFΔF)+gc(ΔF)=Ø

For the first part, if in a checkpoint some subtask of a non-source operator is finished, then

  1. All the source tasks of ALL_TO_ALL inputs are finished and IF. This is due to these precedent tasks sending EndOfPartition to the subtasks of the operator in discussion at the same time, thus the running subtasks should also have received all the records in the checkpoints. 
  2. For the source tasks of POINTWISE inputs, the precedent tasks of the finished subtasks must also be finished. This indicates all the remaining input records should not rely on ΔF, otherwise they could not be correctly computed even without failover. This implies all the remaining records belongs to Iand IF=Ø.

Thus, we should always have IF=Ø and thus gp(IFΔF)=Ø no matter whether we save ΔF

The situation of the second part is equivalent to the source operators. However, the non-source operators rarely have the similar logic as legacy source. Instead, the result related to ΔF is usually sent before finished and does not need to resent after failover. For example, the operators doing aggregation and sending the final result on endOfInput only need sending the remaining aggregation value after failover. Even if there are operators does not satisfy the condition, the operators could push the states could not be discarded to OperatorCoordinator instead, like the new Source API does. 


Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

Proposed Changes

This section details the proposed changes for the options 3 chosen in the last section.

Triggering Checkpoints After Tasks Finished

Currently CheckpointCoordinator only triggers the sources for new checkpoints and other tasks would receive barriers from the input channels. However, it would not work to only trigger the running sources after tasks finished. As exemplified in Figure 1, suppose Task A has finished if we only trigger B for a new checkpoint, then Task C would have no idea about the new checkpoint and would not report its snapshot.

Figure 1. An example execution graph with one task finished. 

It might be argued that we could wait till the Tasks C to finish processing the remaining records and report its final snapshots, then we could include the final snapshot directly in the checkpoints. However, this does not work if C is a two-phase commit sink and requires one successful checkpoint before finish, which will cause deadlock.

Therefore, for checkpoints after some tasks are finished we need to trigger all the leaf tasks whose precedent tasks are all finished. Currently CheckpointCoordinator computes tasks to trigger in a separate timer thread, which runs concurrently with JobMaster’s main thread. The mechanism works well currently since it only needs to pick the current execution attempt for each execution vertex. However, to find all the leaf tasks needs to iterate over the whole graph and if JM changes tasks’ state during computation, it would be hard to guarantee the rightness of the algorithm, especially the JM might restarted the finished tasks on failover. To simplify the implementation we would proxy the computation of tasks to trigger to the main thread. 

The basic algorithm to compute the tasks to trigger would be iterating over the ExecutionGraph to find the leaf running tasks. However, the algorithm could be optimized by iterating over the JobGraph instead. The detailed algorithm is shown in Appendix. 

There might also be cases that tasks get finished between when we decide to trigger them and actually trigger them. In this case, we need to reconsider triggering the descendant tasks of this task. 

Extend the task Barrier Alignment

On the task side, we also need to modify the task barrier alignment mechanism to support the cases that some precedent tasks are finished. In this case, the downstream task would receive the EndOfPartition messages from the input channels.

For non-source tasks, if it receives the trigger of checkpoint n, then all its preceding tasks must be finished before checkpoint n and won’t send it the corresponding barriers. Therefore, we would first bookkeep the checkpoint n and after we received EndOfPartition from all the channels, we then report the snapshot for the checkpoint n. If the task received more checkpoint triggers after receiving EndOfPartition from all the channels, then it would report the snapshots directly.

For non-source tasks that do not receive the direct trigger of the checkpoint, it will receive barriers from some channels, but may receive EndOfPartition from some other channels. In this case, the channel received by EndOfPartition would be viewed as receiving the corresponding barrier. Specifically, for aligned checkpoints, the channels received EndOfPartition would be viewed as aligned. For unaligned checkpoints, the buffers before the EndOfPartition message will be snapshotted

Figure 2. The tasks would either receive the checkpoint trigger and receive EndOfPartition from all the channels, or do not receive checkpoint trigger and receive barriers from some channels.

A summary of all the cases is listed in the following:


Aligned Checkpoint

Unaligned Checkpoint

Received Trigger + all EndOfPartition

Wait till processed all barriers, and then report the snapshot

Wait till processed all EndOfPartition, and then report the snapshot. In this case we would take the snapshot the same as the aligned method.

Do not receive Trigger + some barriers

Wait till received barrier or EndOfPartition from each channel, then report the snapshot.

When receiving the first barrier, start spilling the buffers before the barrier or EndOfParitition for each channel.

Tasks Waiting For Checkpoint Complete Before Finish

If some operators of a task have logic when notifying a checkpoint complete, the tasks would need to wait for a complete checkpoint before finished. For example, the Sink operator would need to wait for one complete checkpoint to commit all the pending transactions. 

To support these operators, the tasks would wait for a completed checkpoints if some operators or UDFS implemented the notifyCheckpointCompleted methods. 

Failover and Recovery

Logically we could not restart the finished tasks on failover. We could only restart those tasks who have operators not fully finished. Specially, if all the source operators have been finished, the remaining operator would directly start the process of finishing. 

For the first version, we would still restart all the tasks, and the finished tasks would start finishing normally. This simplifies the implementation of the task side and scheduler. 

Compatibility, Deprecation and Migration Plan

As discussed in the second section, the FLIP might change the behavior after failover if some operators have logic in initialization or endOfInput that relies on the state from the finished subtasks. However, such jobs would be rare and need to manually modify the job. 

The FLIP would also modify the CompletedCheckpoint serialization format by also recording which operators are fully finished. However, this would not cause compatibility issues due to Java’s serialization mechanism supporting adding or deleting fields. 

Appendix

Computing Tasks to Trigger

The target tasks to trigger are those who are still running but do not have any running precedent tasks. To avoid introducing large overhead to computing tasks to trigger for large jobs, we travel the JobGraph instead of traveling the ExecutionGraph directly. The algorithm is based on the following observation: 

  1. If JobVertex A is connected to JobVertex B and not all tasks of JobVertex A are running, then no tasks of JobVertex B might be triggered if A -> B is ALL_TO_ALL, and only tasks connected from finished tasks might be triggered if A -> B is POINTWISE.
  2. If one task is known to not need to trigger, it must be RUNNING since it is connected from some other running tasks. This indicates that the FINISHED tasks are always a subset of the tasks we need to check for triggering for each JobVertex.
  3. If one JobVertex is connected from multiple precedents JobVertices, then the tasks to check is the Union of the tasks decided by each president JobVertices.

Based on the above observation, we travel the JobVertices via topology order and for each job vertex, we could narrow down the tasks to check for all its descendant JobVertices. For the tasks to check, the not finished ones would be tasks to trigger. The pseudo-code of the algorithm would be

Initialize V.tasks_to_check for each JobVertex V to be all its tasks;

for each JobVertex V in topology order;do
    if V.filtered_tasks_to_check is empty;then
        continue;
    endif

    running, finished = split V.filtered_tasks_to_check
    add running tasks to the tasks to trigger

    for each descendant JobVertex SubV;do
        if finished < all_tasks;then
            if V -> SubV is ALL_TO_ALL;then
                filtered_tasks_to_check = {}
            elif V -> SubV is POINTWISE;then
                filtered_tasks_to_check={tasks connected from running tasks of V}
            endif

            SubV.tasks_to_check = SubV.tasks_to_check union filtered_tasks_to_check;
        endif
    done
done



  • No labels