Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The situation of the second part is equivalent to the source operators. However, the non-source operators rarely have the similar logic as legacy source. Instead, the result related to ΔF is usually sent before finished and does not need to resent after failover. For example, the operators doing aggregation and sending the final result on endOfInput only need sending the remaining aggregation value after failover. Even if there are operators does not satisfy the condition, the operators could push the states could not be discarded to OperatorCoordinator instead, like the new Source API does. 

Special State Types

However, except for the normal operator list state and keyed state, Flink also support other types of state, namely broadcast state and union list state. 

Logically the broadcast state could work well even if part of the tasks are finished for one vertex since the state of each all the tasks are the same, thus we could restore the operator subtasks' states from any previous subtask if the operator is not fully finished. 

The UnioinListState would be more complex. Unlike the normal operator list state, user might have special usage with union list state. For example, users may only have one subtask to store the state, and initialize the subtask with the shared value after restart to implement a more flexible "broadcast" state. Besides, users may have complex initialization logic based on the whole state, like minus the state from a global set of values, which is impossible for normal operator list state. If a part of subtasks are finished and we only keep the left state in the checkpoint, the subtask would have different initialized state after restarted. However, since the UnionListState is on the way to be deprecated and replaced by the OperatorCoordinator, we could first have a temporary fix by asking the tasks of operators using UnionListState to wait for each other to make sure the snapshot of the operator must be complete. 

Based on the above discussion discarding the final states of the finish tasks would only change behavior for a very little fraction of the possible existing jobs whose non-source operators have special logic on initialization or endOfInput, and these jobs could also be modified to keep the current behaviors.  Therefore, we lean towards option 3, which simplify the implementation and leave more room for optimization in the future. 

...