You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Motivation

As discussed in FLIP-131, Flink will make DataStream the unified API for processing bounded and unbounded data in both streaming and blocking modes. However, one long-standing problem for the streaming mode is that currently Flink does not support checkpoints after some tasks finished, which causes some problems for bounded or mixed jobs:

  1. Flink exactly-once sinks rely on checkpoints to ensure data won’t be replayed before committed to external systems in streaming mode. If sources are bounded and checkpoints are disabled after some tasks are finished, the data sent after the last checkpoint would always not be able to be committed. This issue has already been reported some times in the user ML (Case 1, Case 2, Case 3) and is future brought up when working on FLIP-143: Unified Sink API.
  2. The jobs with both bounded and unbounded sources might have to replay a large amount of records after failover due to no periodic checkpoints are taken after the bounded sources finished.

Therefore, we need to also support checkpoints even after some tasks are finished. 

Overall Design

There are multiple options to achieve this target. In this section we compare different options.

Option 1. Prevent tasks from finishing

The first option is to prevent the tasks from finishing until the whole job is finished. Specially, tasks would never finish if the job also has unbounded sources. This option is not preferred due to that

  1. it causes zombie tasks and waste resources. 
  2. it also requires a new “pre-EndOfPartition” event to indicate all the records are sent. Otherwise if it directly sent EndOfPartition before tasks finished, the communication channel would be destroyed and it would also be weird to have checkpoint barriers after EndOfPartition. However, introducing the “pre-EndOfPartition” event would largely complicate the termination process. 

Option 2. Allow tasks to finish & Keep Final Snapshots

Another option is allowing tasks to finish normally and checkpoints after tasks finished would only take snapshots for the running tasks. A core issue of this option is whether we need to keep the final snapshots of the finished tasks and also includes their final snapshots in the following checkpoints. Currently when failover happens after some tasks are finished, the job will fallback to a checkpoint taken when all the tasks are running. Including the final snapshots of the finished tasks ensures the behavior unchanged compared with the current one since the finished tasks could be viewed as still running. However it also introduce some problems:

  1. It causes additional overhead to taking and managing the final snapshots.
  2. Since the same final snapshots would be used in multiple checkpoints, we need to introduce the reference count between checkpoints and snapshots. This complicates the checkpoint management, especially after we already have the reference count between snapshots and state items due to incremental checkpoints.
  3. For the long run, it limits the possibility that we only restart the running tasks on failover. 

Option 3. Allow tasks to finish & Do not keep Final Snapshots

If we do not want to keep the final snapshots of the finished tasks instead, we need to explore how it changes the current behavior. Although in a checkpoint the state is snapshotted in the unit of task, it is finally reorganized by the unit of operator since there might be topology changes or rescaling. In other words, we need to view the checkpoint as composed of the current working progress of each operator, and tasks are only stateless runtime units to execute the remaining work for the operators. If we do not include the state from the finished tasks, it is equivalent to some operator discarding a part of finished work’s state in the checkpoint. Let ΔR represents the state of running task and ΔF represents the state of finished task when taking checkpoints, then the result of the operator’s execution after failover is

g(I, ΔR+ ΔF)=g(IRΔR)+g(IFΔF)

where I is the input after failover and the decomposition is due to the fact that the work could be distributed to the different subtasks. Ideally the result should be the same with the execution without the states from the finished tasks, namely g(I, ΔR+ ΔF)=g(IΔR), which is equivalent to 

g(IFΔF)=Ø

Namely there should be no records sent due to ΔF no matter whether we keep it or not. 

Source Operators

The source operator does not have input and the equation is further equivalent to g(ΔF)=Ø. The logic of the source operators could be modeled as reading each split from the external system and emitting the records to the pipeline. With legacy source API the source operators usually discover all the splits on startup and record the progress of each split in a union list state. Unfortunately with this pattern if we discard the state for the finished splits, we would re-read them after failover, which violates the condition if we do not keep ΔF. The new source API would overcome this issue since now the splits are discovered and recorded in the OperatorCoordinator, whose state is still kept after all the tasks are finished. 

In consideration that we would finally migrate to the new source API, we could temporarily avoid the repeat records issue of the legacy source issue by 

  1. Failing checkpoints if some source operators contain both finished and unfinished subtasks.
  2. Skipping the actual execution for legacy source task (namely StreamSourceTask) if all the subtasks have finished before failover. This requires we also bookkeep the finished operators in the checkpoints.

As a whole, the source operators could achieve the condition, and the temporary hack could be removed after we migrated to the new Source API.


Non-source Operators

The logic of a non-source operator could be split into processing the input records and the logic in initialize and endOfInput, namely the condition is equivalent to  

g(IFΔF)=gp(IFΔF)+gc(ΔF)=Ø

For the first part, if in a checkpoint some subtask of a non-source operator is finished, then

  1. All the source tasks of ALL_TO_ALL inputs are finished and IF. This is due to these precedent tasks sending EndOfPartition to the subtasks of the operator in discussion at the same time, thus the running subtasks should also have received all the records in the checkpoints. 
  2. For the source tasks of POINTWISE inputs, the precedent tasks of the finished subtasks must also be finished. This indicates all the remaining input records should not rely on ΔF, otherwise they could not be correctly computed even without failover. This implies all the remaining records belongs to Iand IF=Ø.

Thus, we should always have IF=Ø and thus gp(IFΔF)=Ø no matter whether we save ΔF

The situation of the second part is equivalent to the source operators. However, the non-source operators rarely have the similar logic as legacy source. Instead, the result related to ΔF is usually sent before finished and does not need to resent after failover. For example, the operators doing aggregation and sending the final result on endOfInput only need sending the remaining aggregation value after failover. Even if there are operators does not satisfy the condition, the operators could push the states could not be discarded to OperatorCoordinator instead, like the new Source API does. 


Based on the above discussion, discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

  • No labels